Skip to content

Commit

Permalink
[core] Add file compression type in file names (#4420)
Browse files Browse the repository at this point in the history
  • Loading branch information
askwang authored Nov 4, 2024
1 parent 5f00ad6 commit f561137
Show file tree
Hide file tree
Showing 18 changed files with 187 additions and 20 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,12 @@
<td>Map</td>
<td>Define different file format for different level, you can add the conf like this: 'file.format.per.level' = '0:avro,3:parquet', if the file format for level is not provided, the default format which set by `file.format` will be used.</td>
</tr>
<tr>
<td><h5>file.suffix.include.compression</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>Whether to add file compression type in the file name of data file and changelog file.</td>
</tr>
<tr>
<td><h5>force-lookup</h5></td>
<td style="word-wrap: break-word;">false</td>
Expand Down
11 changes: 11 additions & 0 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,13 @@ public class CoreOptions implements Serializable {
.defaultValue("changelog-")
.withDescription("Specify the file name prefix of changelog files.");

public static final ConfigOption<Boolean> FILE_SUFFIX_INCLUDE_COMPRESSION =
key("file.suffix.include.compression")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to add file compression type in the file name of data file and changelog file.");

public static final ConfigOption<MemorySize> FILE_BLOCK_SIZE =
key("file.block-size")
.memoryType()
Expand Down Expand Up @@ -1591,6 +1598,10 @@ public String changelogFilePrefix() {
return options.get(CHANGELOG_FILE_PREFIX);
}

public boolean fileSuffixIncludeCompression() {
return options.get(FILE_SUFFIX_INCLUDE_COMPRESSION);
}

public String fieldsDefaultFunc() {
return options.get(FIELDS_DEFAULT_AGG_FUNC);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,9 @@ public FileStorePathFactory pathFactory() {
options.fileFormat().getFormatIdentifier(),
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName());
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,9 @@ private Map<String, FileStorePathFactory> format2PathFactory() {
format,
options.dataFilePrefix(),
options.changelogFilePrefix(),
options.legacyPartitionName())));
options.legacyPartitionName(),
options.fileSuffixIncludeCompression(),
options.fileCompression())));
return pathFactoryMap;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,24 @@ public class DataFilePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;

public DataFilePathFactory(
Path parent,
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix) {
String changelogFilePrefix,
boolean fileSuffixIncludeCompression,
String fileCompression) {
this.parent = parent;
this.uuid = UUID.randomUUID().toString();
this.pathCount = new AtomicInteger(0);
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;
}

public Path newPath() {
Expand All @@ -62,7 +68,13 @@ public Path newChangelogPath() {
}

private Path newPath(String prefix) {
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + "." + formatIdentifier;
String extension;
if (fileSuffixIncludeCompression) {
extension = "." + fileCompression + "." + formatIdentifier;
} else {
extension = "." + formatIdentifier;
}
String name = prefix + uuid + "-" + pathCount.getAndIncrement() + extension;
return new Path(parent, name);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public class FileStorePathFactory {
private final String formatIdentifier;
private final String dataFilePrefix;
private final String changelogFilePrefix;
private final boolean fileSuffixIncludeCompression;
private final String fileCompression;

private final AtomicInteger manifestFileCount;
private final AtomicInteger manifestListCount;
Expand All @@ -57,7 +59,9 @@ public FileStorePathFactory(
String formatIdentifier,
String dataFilePrefix,
String changelogFilePrefix,
boolean legacyPartitionName) {
boolean legacyPartitionName,
boolean fileSuffixIncludeCompression,
String fileCompression) {
this.root = root;
this.uuid = UUID.randomUUID().toString();

Expand All @@ -66,6 +70,8 @@ public FileStorePathFactory(
this.formatIdentifier = formatIdentifier;
this.dataFilePrefix = dataFilePrefix;
this.changelogFilePrefix = changelogFilePrefix;
this.fileSuffixIncludeCompression = fileSuffixIncludeCompression;
this.fileCompression = fileCompression;

this.manifestFileCount = new AtomicInteger(0);
this.manifestListCount = new AtomicInteger(0);
Expand Down Expand Up @@ -113,7 +119,9 @@ public DataFilePathFactory createDataFilePathFactory(BinaryRow partition, int bu
bucketPath(partition, bucket),
formatIdentifier,
dataFilePrefix,
changelogFilePrefix);
changelogFilePrefix,
fileSuffixIncludeCompression,
fileCompression);
}

public Path bucketPath(BinaryRow partition, int bucket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,9 @@ private DataFilePathFactory createPathFactory() {
new Path(tempDir + "/dt=" + PART + "/bucket-0"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
}

private AppendOnlyWriter createEmptyWriter(long targetFileSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@ public void testFileSuffix(@TempDir java.nio.file.Path tempDir) throws Exception
new Path(tempDir + "/dt=1/bucket-1"),
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
FileFormat fileFormat = FileFormat.fromIdentifier(format, new Options());
LinkedList<DataFileMeta> toCompact = new LinkedList<>();
CoreOptions options = new CoreOptions(new HashMap<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@ public void testNoPartition() {
new Path(tempDir + "/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand All @@ -64,7 +66,9 @@ public void testWithPartition() {
new Path(tempDir + "/dt=20211224/bucket-123"),
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue());
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
String uuid = pathFactory.uuid();

for (int i = 0; i < 20; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
format,
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
Options options = new Options();
Expand All @@ -246,7 +248,9 @@ protected KeyValueFileWriterFactory createWriterFactory(String pathStr, String f
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()));
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()));

return KeyValueFileWriterFactory.builder(
fileIO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ public void initialize(String identifier, boolean statsDenseStore) {
.toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX
.defaultValue())
.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION
.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue())
.newPath(),
SCHEMA,
fileFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ protected ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_FORMAT.defaultValue(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue()),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue()),
Long.MAX_VALUE,
null)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ private ManifestFile createManifestFile(String pathStr) {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
int suggestedFileSize = ThreadLocalRandom.current().nextInt(8192) + 1024;
FileIO fileIO = FileIOFinder.find(path);
return new ManifestFile.Factory(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ private ManifestList createManifestList(String pathStr) {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
return new ManifestList.Factory(FileIOFinder.find(path), avro, "zstd", pathFactory, null)
.create();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ public void testCreateDataFilePathFactoryWithPartition() {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());

assertPartition("20211224", 16, pathFactory, "/dt=20211224/hr=16");
assertPartition("20211224", null, pathFactory, "/dt=20211224/hr=default");
Expand Down Expand Up @@ -126,6 +128,8 @@ public static FileStorePathFactory createNonPartFactory(Path root) {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ public TestChangelogDataReadWrite(String root) {
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());
this.snapshotManager = new SnapshotManager(LocalFileIO.create(), new Path(root));
this.commitUser = UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,9 @@ protected void foreachIndexReader(Consumer<FileIndexReader> consumer)
CoreOptions.FILE_FORMAT.defaultValue().toString(),
CoreOptions.DATA_FILE_PREFIX.defaultValue(),
CoreOptions.CHANGELOG_FILE_PREFIX.defaultValue(),
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue());
CoreOptions.PARTITION_GENERATE_LEGCY_NAME.defaultValue(),
CoreOptions.FILE_SUFFIX_INCLUDE_COMPRESSION.defaultValue(),
CoreOptions.FILE_COMPRESSION.defaultValue());

Table table = fileSystemCatalog.getTable(Identifier.create("db", "T"));
ReadBuilder readBuilder = table.newReadBuilder();
Expand Down
Loading

0 comments on commit f561137

Please sign in to comment.