From cb0bf56f8d3b03ec2f2c4b67896de16486e54b71 Mon Sep 17 00:00:00 2001 From: shidayang <530847445@qq.com> Date: Wed, 18 Jan 2023 16:50:00 +0800 Subject: [PATCH 1/4] Adapt new Transaction model --- .../ams/server/optimize/FullOptimizePlan.java | 7 +- .../server/optimize/MinorOptimizePlan.java | 8 +- .../server/optimize/SupportHiveCommit.java | 3 +- .../com/netease/arctic/data/DataTreeNode.java | 3 + .../netease/arctic/data/DefaultKeyedFile.java | 53 +----- .../com/netease/arctic/io/FileNameHandle.java | 151 ++++++++++++++++++ .../arctic/scan/BaseArcticFileScanTask.java | 3 +- .../scan/BaseChangeTableIncrementalScan.java | 4 +- .../arctic/utils/ConvertStructUtil.java | 5 +- .../netease/arctic/utils/TableFileUtils.java | 48 ------ .../arctic/data/KeyedDataFileTest.java | 2 +- .../netease/arctic/io/FileNameHandleTest.java | 91 +++++++++++ .../arctic/io/TableTestBaseWithInitData.java | 2 - .../arctic/scan/TableEntriesScanTest.java | 5 +- .../netease/arctic/utils/FileUtilTest.java | 17 -- 15 files changed, 272 insertions(+), 130 deletions(-) create mode 100644 core/src/main/java/com/netease/arctic/io/FileNameHandle.java create mode 100644 core/src/test/java/com/netease/arctic/io/FileNameHandleTest.java diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java index be49a09009..2060768463 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/FullOptimizePlan.java @@ -28,6 +28,7 @@ import com.netease.arctic.ams.server.utils.ContentFileUtil; import com.netease.arctic.data.DataFileType; import com.netease.arctic.data.DataTreeNode; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.table.UnkeyedTable; @@ -219,12 +220,12 @@ private List collectKeyedTableTasks(String partition, FileTree if (!baseFiles.isEmpty()) { List sourceNodes = Collections.singletonList(subTree.getNode()); Set baseFileNodes = baseFiles.stream() - .map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString())) + .map(dataFile -> FileNameHandle.parseFileNodeFromFileName(dataFile.path().toString())) .collect(Collectors.toSet()); List posDeleteFiles = partitionPosDeleteFiles .computeIfAbsent(partition, e -> Collections.emptyList()).stream() .filter(deleteFile -> - baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString()))) + baseFileNodes.contains(FileNameHandle.parseFileNodeFromFileName(deleteFile.path().toString()))) .collect(Collectors.toList()); if (nodeTaskNeedBuild(posDeleteFiles, baseFiles)) { @@ -288,7 +289,7 @@ private void addBaseFileIntoFileTree() { private long getMaxTransactionId(List dataFiles) { OptionalLong maxTransactionId = dataFiles.stream() - .mapToLong(file -> TableFileUtils.parseFileTidFromFileName(file.path().toString())).max(); + .mapToLong(file -> FileNameHandle.parseBase(file.path().toString()).transactionId()).max(); if (maxTransactionId.isPresent()) { return maxTransactionId.getAsLong(); } diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java index dadc24f8c3..affa461fd0 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/MinorOptimizePlan.java @@ -28,6 +28,7 @@ import com.netease.arctic.ams.server.utils.ContentFileUtil; import com.netease.arctic.data.DataFileType; import com.netease.arctic.data.DataTreeNode; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.KeyedTable; import com.netease.arctic.table.TableProperties; @@ -247,7 +248,8 @@ public long getTransactionId() { public long getLegacyTransactionId() { if (legacyTransactionId == -1) { - legacyTransactionId = TableFileUtils.parseFileTidFromFileName(dataFileInfo.getPath()); + legacyTransactionId = FileNameHandle.parseChange(dataFileInfo.getPath(), + dataFileInfo.getSequence()).transactionId(); } return legacyTransactionId; } @@ -338,12 +340,12 @@ private List collectKeyedTableTasks(String partition, FileTree sourceNodes = Collections.singletonList(subTree.getNode()); } Set baseFileNodes = baseFiles.stream() - .map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString())) + .map(dataFile -> FileNameHandle.parseFileNodeFromFileName(dataFile.path().toString())) .collect(Collectors.toSet()); List posDeleteFiles = partitionPosDeleteFiles .computeIfAbsent(partition, e -> Collections.emptyList()).stream() .filter(deleteFile -> - baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString()))) + baseFileNodes.contains(FileNameHandle.parseFileNodeFromFileName(deleteFile.path().toString()))) .collect(Collectors.toList()); // if no insert files and no eq-delete file, skip if (CollectionUtils.isEmpty(insertFiles) && CollectionUtils.isEmpty(deleteFiles)) { diff --git a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java index fd307a4241..5d0bd5b735 100644 --- a/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java +++ b/ams/ams-server/src/main/java/com/netease/arctic/ams/server/optimize/SupportHiveCommit.java @@ -27,6 +27,7 @@ import com.netease.arctic.hive.utils.HivePartitionUtil; import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.hive.utils.TableTypeUtil; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.utils.IdGenerator; import com.netease.arctic.utils.SerializationUtils; @@ -82,7 +83,7 @@ public boolean commit(long baseSnapshotId) throws Exception { .map(fileByte -> (DataFile) SerializationUtils.toInternalTableFile(fileByte)) .collect(Collectors.toList()); long maxTransactionId = targetFiles.stream() - .mapToLong(dataFile -> TableFileUtils.parseFileTidFromFileName(dataFile.path().toString())) + .mapToLong(dataFile -> FileNameHandle.parseBase(dataFile.path().toString()).transactionId()) .max() .orElse(0L); diff --git a/core/src/main/java/com/netease/arctic/data/DataTreeNode.java b/core/src/main/java/com/netease/arctic/data/DataTreeNode.java index eb1d256ea3..2a3de8f196 100644 --- a/core/src/main/java/com/netease/arctic/data/DataTreeNode.java +++ b/core/src/main/java/com/netease/arctic/data/DataTreeNode.java @@ -41,6 +41,9 @@ public final class DataTreeNode implements Serializable { public static final DataTreeNode ROOT = new DataTreeNode(0, 0); public static DataTreeNode of(long mask, long index) { + if (index > mask) { + throw new IllegalArgumentException("index can not be greater than mask"); + } return new DataTreeNode(mask, index); } diff --git a/core/src/main/java/com/netease/arctic/data/DefaultKeyedFile.java b/core/src/main/java/com/netease/arctic/data/DefaultKeyedFile.java index f5dbab7679..830f649577 100644 --- a/core/src/main/java/com/netease/arctic/data/DefaultKeyedFile.java +++ b/core/src/main/java/com/netease/arctic/data/DefaultKeyedFile.java @@ -18,6 +18,7 @@ package com.netease.arctic.data; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.utils.TableFileUtils; import org.apache.iceberg.DataFile; import org.apache.iceberg.FileFormat; @@ -47,12 +48,12 @@ private DefaultKeyedFile(DataFile internalFile, FileMeta meta) { } public static DefaultKeyedFile parseChange(DataFile dataFile, Long sequenceNumber) { - FileMeta fileMeta = FileMeta.parseChange(dataFile.path().toString(), sequenceNumber); + FileMeta fileMeta = FileNameHandle.parseChange(dataFile.path().toString(), sequenceNumber); return new DefaultKeyedFile(dataFile, fileMeta); } public static DefaultKeyedFile parseBase(DataFile dataFile) { - FileMeta fileMeta = FileMeta.parseBase(dataFile.path().toString()); + FileMeta fileMeta = FileNameHandle.parseBase(dataFile.path().toString()); return new DefaultKeyedFile(dataFile, fileMeta); } @@ -175,14 +176,11 @@ public int hashCode() { public static class FileMeta { - private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-(\\d+)\\.\\w+"; - private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING); - private final long transactionId; private final DataFileType type; private final DataTreeNode node; - private FileMeta(Long transactionId, DataFileType type, DataTreeNode node) { + public FileMeta(Long transactionId, DataFileType type, DataTreeNode node) { this.transactionId = transactionId; this.type = type; this.node = node; @@ -199,48 +197,5 @@ public DataFileType type() { public DataTreeNode node() { return node; } - - /** - * Flink write transactionId as 0. - * if we get transactionId from path is 0, we set transactionId as iceberg sequenceNumber. - * @param path file path - * @param sequenceNumber iceberg sequenceNumber - */ - public static FileMeta parseChange(String path, Long sequenceNumber) { - String fileName = TableFileUtils.getFileName(path); - Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); - long nodeId = 1; - DataFileType type = null; - long transactionId = 0L; - if (matcher.matches()) { - nodeId = Long.parseLong(matcher.group(1)); - type = DataFileType.ofShortName(matcher.group(2)); - transactionId = Long.parseLong(matcher.group(3)); - transactionId = transactionId == 0 ? sequenceNumber : transactionId; - } - DataTreeNode node = DataTreeNode.ofId(nodeId); - return new DefaultKeyedFile.FileMeta(transactionId, type, node); - } - - /** - * Path writen by hive can not be pared by arctic format. so we set it transactionId as 0. - * @param path file path - */ - public static FileMeta parseBase(String path) { - String fileName = TableFileUtils.getFileName(path); - Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); - long nodeId = 1; - DataFileType type = DataFileType.BASE_FILE; - long transactionId = 0L; - if (matcher.matches()) { - nodeId = Long.parseLong(matcher.group(1)); - type = DataFileType.ofShortName(matcher.group(2)); - transactionId = Long.parseLong(matcher.group(3)); - } else { - transactionId = 0; - } - DataTreeNode node = DataTreeNode.ofId(nodeId); - return new DefaultKeyedFile.FileMeta(transactionId, type, node); - } } } diff --git a/core/src/main/java/com/netease/arctic/io/FileNameHandle.java b/core/src/main/java/com/netease/arctic/io/FileNameHandle.java new file mode 100644 index 0000000000..2b9fbc331a --- /dev/null +++ b/core/src/main/java/com/netease/arctic/io/FileNameHandle.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.io; + +import com.netease.arctic.data.DataFileType; +import com.netease.arctic.data.DataTreeNode; +import com.netease.arctic.data.DefaultKeyedFile; +import com.netease.arctic.io.writer.TaskWriterKey; +import com.netease.arctic.utils.IdGenerator; +import com.netease.arctic.utils.TableFileUtils; +import org.apache.iceberg.FileFormat; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class FileNameHandle { + + private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*"; + private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING); + + private static final String FORMAT = "%d-%s-%d-%05d-%d-%s-%05d"; + + private FileFormat fileFormat; + private final int partitionId; + private final long taskId; + private final long transactionId; + + // uuid avoid duplicated file name + private final String operationId = IdGenerator.randomId() + ""; + private final AtomicLong fileCount = new AtomicLong(0); + + public FileNameHandle( + FileFormat fileFormat, + int partitionId, + Long taskId, + Long transactionId) { + this.fileFormat = fileFormat; + this.partitionId = partitionId; + this.taskId = taskId; + this.transactionId = transactionId == null ? 0 : transactionId; + } + + public String fileName(TaskWriterKey key) { + return fileFormat.addExtension( + String.format(FORMAT, key.getTreeNode().getId(), key.getFileType().shortName(), + transactionId, partitionId, taskId, operationId, fileCount.incrementAndGet())); + } + + /** + * Flink write transactionId as 0. + * if we get transactionId from path is 0, we set transactionId as iceberg sequenceNumber. + * @param path file path + * @param sequenceNumber iceberg sequenceNumber + */ + public static DefaultKeyedFile.FileMeta parseChange(String path, Long sequenceNumber) { + String fileName = TableFileUtils.getFileName(path); + Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); + long nodeId = 1; + DataFileType type = null; + long transactionId = 0L; + if (matcher.matches()) { + nodeId = Long.parseLong(matcher.group(1)); + type = DataFileType.ofShortName(matcher.group(2)); + transactionId = Long.parseLong(matcher.group(3)); + transactionId = transactionId == 0 ? sequenceNumber : transactionId; + } + DataTreeNode node = DataTreeNode.ofId(nodeId); + return new DefaultKeyedFile.FileMeta(transactionId, type, node); + } + + /** + * Path writen by hive can not be pared by arctic format. so we set it transactionId as 0. + * @param path file path + */ + public static DefaultKeyedFile.FileMeta parseBase(String path) { + String fileName = TableFileUtils.getFileName(path); + Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); + long nodeId = 1; + DataFileType type = DataFileType.BASE_FILE; + long transactionId = 0L; + if (matcher.matches()) { + nodeId = Long.parseLong(matcher.group(1)); + type = DataFileType.ofShortName(matcher.group(2)); + if (type == DataFileType.INSERT_FILE) { + type = DataFileType.BASE_FILE; + } + transactionId = Long.parseLong(matcher.group(3)); + } else { + transactionId = 0; + } + DataTreeNode node = DataTreeNode.ofId(nodeId); + return new DefaultKeyedFile.FileMeta(transactionId, type, node); + } + + public static DataFileType parseFileTypeForChange(String path) { + String fileName = TableFileUtils.getFileName(path); + Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); + DataFileType type; + if (matcher.matches()) { + type = DataFileType.ofShortName(matcher.group(2)); + } else { + throw new IllegalArgumentException("path is illegal"); + } + return type; + } + + public static DataFileType parseFileTypeForBase(String path) { + String fileName = TableFileUtils.getFileName(path); + Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); + DataFileType type = DataFileType.BASE_FILE; + if (matcher.matches()) { + type = DataFileType.ofShortName(matcher.group(2)); + if (type == DataFileType.INSERT_FILE) { + type = DataFileType.BASE_FILE; + } + } + return type; + } + + /** + * parse keyed file node id from file name + * @param fileName fileName + * @return node id + */ + public static DataTreeNode parseFileNodeFromFileName(String fileName) { + fileName = TableFileUtils.getFileName(fileName); + Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); + long nodeId = 1; + if (matcher.matches()) { + nodeId = Long.parseLong(matcher.group(1)); + } + return DataTreeNode.ofId(nodeId); + } +} diff --git a/core/src/main/java/com/netease/arctic/scan/BaseArcticFileScanTask.java b/core/src/main/java/com/netease/arctic/scan/BaseArcticFileScanTask.java index cd2d3568f3..29cf20103e 100644 --- a/core/src/main/java/com/netease/arctic/scan/BaseArcticFileScanTask.java +++ b/core/src/main/java/com/netease/arctic/scan/BaseArcticFileScanTask.java @@ -20,6 +20,7 @@ import com.netease.arctic.data.DefaultKeyedFile; import com.netease.arctic.data.PrimaryKeyedFile; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.utils.TableFileUtils; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileScanTask; @@ -56,7 +57,7 @@ public BaseArcticFileScanTask( this.baseFile = baseFile; this.posDeleteFiles = posDeleteFiles == null ? Collections.emptyList() : posDeleteFiles.stream().filter(s -> { - DefaultKeyedFile.FileMeta fileMeta = DefaultKeyedFile.FileMeta.parseBase(s.path().toString()); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(s.path().toString()); return fileMeta.node().index() == baseFile.node().index() && fileMeta.node().mask() == baseFile.node().mask(); }).collect(Collectors.toList()); diff --git a/core/src/main/java/com/netease/arctic/scan/BaseChangeTableIncrementalScan.java b/core/src/main/java/com/netease/arctic/scan/BaseChangeTableIncrementalScan.java index 31eddfe155..e3c2765010 100644 --- a/core/src/main/java/com/netease/arctic/scan/BaseChangeTableIncrementalScan.java +++ b/core/src/main/java/com/netease/arctic/scan/BaseChangeTableIncrementalScan.java @@ -20,6 +20,7 @@ import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.data.DefaultKeyedFile; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.table.ChangeTable; import com.netease.arctic.table.TableProperties; import com.netease.arctic.utils.TableFileUtils; @@ -88,7 +89,8 @@ public CloseableIterable planTasks(PartitionDataFilter shoul Boolean shouldKeep = shouldKeepFile.shouldKeep(partition, sequenceNumber); if (shouldKeep == null) { String filePath = entry.getFile().path().toString(); - return shouldKeepFileWithLegacyTxId.shouldKeep(partition, TableFileUtils.parseFileTidFromFileName(filePath)); + return shouldKeepFileWithLegacyTxId.shouldKeep(partition, FileNameHandle.parseChange(filePath, + sequenceNumber).transactionId()); } else { return shouldKeep; } diff --git a/core/src/main/java/com/netease/arctic/utils/ConvertStructUtil.java b/core/src/main/java/com/netease/arctic/utils/ConvertStructUtil.java index 342357cfcd..f2f270a41e 100644 --- a/core/src/main/java/com/netease/arctic/utils/ConvertStructUtil.java +++ b/core/src/main/java/com/netease/arctic/utils/ConvertStructUtil.java @@ -23,6 +23,7 @@ import com.netease.arctic.ams.api.properties.MetaTableProperties; import com.netease.arctic.data.DataFileType; import com.netease.arctic.data.DefaultKeyedFile; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.table.ArcticTable; import com.netease.arctic.table.PrimaryKeySpec; import com.netease.arctic.table.TableIdentifier; @@ -72,13 +73,13 @@ public static com.netease.arctic.ams.api.DataFile convertToAmsDatafile( */ FileContent content = dataFile.content(); if (content == FileContent.DATA) { - DefaultKeyedFile.FileMeta fileMeta = DefaultKeyedFile.FileMeta.parseBase(dataFile.path().toString()); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(dataFile.path().toString()); validateArcticFileType(content, dataFile.path().toString(), fileMeta.type()); amsDataFile.setFileType(fileMeta.type().name()); amsDataFile.setIndex(fileMeta.node().index()); amsDataFile.setMask(fileMeta.node().mask()); } else if (content == FileContent.POSITION_DELETES) { - DefaultKeyedFile.FileMeta fileMeta = DefaultKeyedFile.FileMeta.parseBase(dataFile.path().toString()); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(dataFile.path().toString()); amsDataFile.setFileType(DataFileType.POS_DELETE_FILE.name()); if (fileMeta.type() == DataFileType.POS_DELETE_FILE || fileMeta.type() == DataFileType.BASE_FILE) { amsDataFile.setIndex(fileMeta.node().index()); diff --git a/core/src/main/java/com/netease/arctic/utils/TableFileUtils.java b/core/src/main/java/com/netease/arctic/utils/TableFileUtils.java index ff6c4322c6..97b6f65603 100644 --- a/core/src/main/java/com/netease/arctic/utils/TableFileUtils.java +++ b/core/src/main/java/com/netease/arctic/utils/TableFileUtils.java @@ -37,9 +37,6 @@ public class TableFileUtils { private static final Logger LOG = LoggerFactory.getLogger(TableFileUtils.class); - private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-(\\d+)\\.\\w+"; - private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING); - /** * Parse file name form file path * @@ -105,51 +102,6 @@ public static String getNewFilePath(String newDirectory, String filePath) { return newDirectory + File.separator + getFileName(filePath); } - /** - * parse keyed file type from file name - * @param fileName fileName - * @return DataFileType - */ - public static DataFileType parseFileTypeFromFileName(String fileName) { - fileName = TableFileUtils.getFileName(fileName); - Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); - DataFileType type = DataFileType.BASE_FILE; - if (matcher.matches()) { - type = DataFileType.ofShortName(matcher.group(2)); - } - return type; - } - - /** - * parse keyed file transaction id from file name - * @param fileName fileName - * @return transaction id - */ - public static long parseFileTidFromFileName(String fileName) { - fileName = TableFileUtils.getFileName(fileName); - Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); - long transactionId = 0L; - if (matcher.matches()) { - transactionId = Long.parseLong(matcher.group(3)); - } - return transactionId; - } - - /** - * parse keyed file node id from file name - * @param fileName fileName - * @return node id - */ - public static DataTreeNode parseFileNodeFromFileName(String fileName) { - fileName = TableFileUtils.getFileName(fileName); - Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName); - long nodeId = 1; - if (matcher.matches()) { - nodeId = Long.parseLong(matcher.group(1)); - } - return DataTreeNode.ofId(nodeId); - } - /** * remove Uniform Resource Identifier (URI) in file path * @param path file path with Uniform Resource Identifier (URI) diff --git a/core/src/test/java/com/netease/arctic/data/KeyedDataFileTest.java b/core/src/test/java/com/netease/arctic/data/KeyedDataFileTest.java index 135a8f2f20..6dceb857b7 100644 --- a/core/src/test/java/com/netease/arctic/data/KeyedDataFileTest.java +++ b/core/src/test/java/com/netease/arctic/data/KeyedDataFileTest.java @@ -37,7 +37,7 @@ public void testDefaultKeyedFile() { List writeFiles = writeChange(PK_TABLE_ID, ChangeAction.INSERT, writeRecords()); Assert.assertEquals(1, writeFiles.size()); - DefaultKeyedFile defaultKeyedFile = new DefaultKeyedFile(writeFiles.get(0)); + DefaultKeyedFile defaultKeyedFile = DefaultKeyedFile.parseChange(writeFiles.get(0), 0L); Assert.assertEquals(DataFileType.INSERT_FILE, defaultKeyedFile.type()); Assert.assertEquals(3, defaultKeyedFile.node().mask()); Assert.assertEquals(0, defaultKeyedFile.node().index()); diff --git a/core/src/test/java/com/netease/arctic/io/FileNameHandleTest.java b/core/src/test/java/com/netease/arctic/io/FileNameHandleTest.java new file mode 100644 index 0000000000..39a9657cb8 --- /dev/null +++ b/core/src/test/java/com/netease/arctic/io/FileNameHandleTest.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.netease.arctic.io; + +import com.netease.arctic.data.DataFileType; +import com.netease.arctic.data.DataTreeNode; +import com.netease.arctic.data.DefaultKeyedFile; +import com.netease.arctic.io.writer.TaskWriterKey; +import org.apache.iceberg.FileFormat; +import org.junit.Assert; +import org.junit.Test; + +public class FileNameHandleTest { + + @Test + public void newBaseFileName() { + FileNameHandle fileNameHandle = new FileNameHandle(FileFormat.PARQUET, 0, 1L, 2L); + TaskWriterKey writerKey = new TaskWriterKey(null, DataTreeNode.of(3, 3), DataFileType.BASE_FILE); + String fileName = fileNameHandle.fileName(writerKey); + System.out.println(fileName); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(fileName); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(3, 3)); + Assert.assertEquals(fileMeta.type(), DataFileType.BASE_FILE); + Assert.assertEquals(fileMeta.transactionId(), 2L); + } + + @Test + public void hiveFileName() { + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase("a"); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(0, 0)); + Assert.assertEquals(fileMeta.type(), DataFileType.BASE_FILE); + Assert.assertEquals(fileMeta.transactionId(), 0L); + } + + @Test + public void flinkChangeFile2Base() { + FileNameHandle fileNameHandle = new FileNameHandle(FileFormat.PARQUET, 0, 1L, null); + TaskWriterKey writerKey = new TaskWriterKey(null, DataTreeNode.of(3, 3), DataFileType.INSERT_FILE); + String fileName = fileNameHandle.fileName(writerKey); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(fileName); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(3, 3)); + Assert.assertEquals(fileMeta.type(), DataFileType.BASE_FILE); + Assert.assertEquals(fileMeta.transactionId(), 0L); + } + + @Test + public void flinkChangeFile() { + FileNameHandle fileNameHandle = new FileNameHandle(FileFormat.PARQUET, 0, 1L, null); + TaskWriterKey writerKey = new TaskWriterKey(null, DataTreeNode.of(3, 3), DataFileType.INSERT_FILE); + String fileName = fileNameHandle.fileName(writerKey); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseChange(fileName, 5L); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(3, 3)); + Assert.assertEquals(fileMeta.type(), DataFileType.INSERT_FILE); + Assert.assertEquals(fileMeta.transactionId(), 5L); + } + + @Test + public void sparkChangeFile() { + FileNameHandle fileNameHandle = new FileNameHandle(FileFormat.PARQUET, 0, 1L, 5L); + TaskWriterKey writerKey = new TaskWriterKey(null, DataTreeNode.of(3, 3), DataFileType.INSERT_FILE); + String fileName = fileNameHandle.fileName(writerKey); + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseChange(fileName, 6L); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(3, 3)); + Assert.assertEquals(fileMeta.type(), DataFileType.INSERT_FILE); + Assert.assertEquals(fileMeta.transactionId(), 5L); + } + + @Test + public void adaptOldFileName() { + DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseChange("hdfs://easyops-sloth/user/warehouse/animal_partition_two/base/5-I-2-00000-941953957-0000000001.parquet", 6L); + Assert.assertEquals(fileMeta.node(), DataTreeNode.of(3, 1)); + Assert.assertEquals(fileMeta.type(), DataFileType.INSERT_FILE); + Assert.assertEquals(fileMeta.transactionId(), 2L); + } +} diff --git a/core/src/test/java/com/netease/arctic/io/TableTestBaseWithInitData.java b/core/src/test/java/com/netease/arctic/io/TableTestBaseWithInitData.java index bba3e61ee0..15898fac38 100644 --- a/core/src/test/java/com/netease/arctic/io/TableTestBaseWithInitData.java +++ b/core/src/test/java/com/netease/arctic/io/TableTestBaseWithInitData.java @@ -84,7 +84,6 @@ public void initData() throws IOException { WriteResult result = writer.complete(); AppendFiles changeAppend = testKeyedTable.changeTable().newAppend(); Arrays.stream(result.dataFiles()) - .map(DefaultKeyedFile::new) .forEach(changeAppend::appendFile); changeAppend.commit(); } @@ -99,7 +98,6 @@ public void initData() throws IOException { WriteResult result = writer.complete(); AppendFiles changeAppend = testKeyedTable.changeTable().newAppend(); Arrays.stream(result.dataFiles()) - .map(DefaultKeyedFile::new) .forEach(changeAppend::appendFile); changeAppend.commit(); } diff --git a/core/src/test/java/com/netease/arctic/scan/TableEntriesScanTest.java b/core/src/test/java/com/netease/arctic/scan/TableEntriesScanTest.java index eef2db3e91..5ea8f3e6cd 100644 --- a/core/src/test/java/com/netease/arctic/scan/TableEntriesScanTest.java +++ b/core/src/test/java/com/netease/arctic/scan/TableEntriesScanTest.java @@ -20,6 +20,7 @@ import com.netease.arctic.IcebergFileEntry; import com.netease.arctic.data.DataFileType; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.io.TableTestBaseWithInitData; import com.netease.arctic.utils.TableFileUtils; import org.apache.iceberg.ContentFile; @@ -45,7 +46,7 @@ public void testScanDataEntries() { for (IcebergFileEntry entry : dataFileScan.entries()) { cnt++; DataFile file = (DataFile) entry.getFile(); - DataFileType dataFileType = TableFileUtils.parseFileTypeFromFileName(file.path().toString()); + DataFileType dataFileType = FileNameHandle.parseFileTypeForChange(file.path().toString()); if (dataFileType == DataFileType.INSERT_FILE) { Assert.assertEquals(1, entry.getSequenceNumber()); } else if (dataFileType == DataFileType.EQ_DELETE_FILE) { @@ -112,7 +113,7 @@ public void testScanEntriesWithFilter() { for (IcebergFileEntry entry : dataFileScan.entries()) { cnt++; DataFile file = (DataFile) entry.getFile(); - DataFileType dataFileType = TableFileUtils.parseFileTypeFromFileName(file.path().toString()); + DataFileType dataFileType = FileNameHandle.parseFileTypeForChange(file.path().toString()); if (dataFileType == DataFileType.INSERT_FILE) { Assert.assertEquals(1, entry.getSequenceNumber()); } else if (dataFileType == DataFileType.EQ_DELETE_FILE) { diff --git a/core/src/test/java/com/netease/arctic/utils/FileUtilTest.java b/core/src/test/java/com/netease/arctic/utils/FileUtilTest.java index e79eff3c88..ae6b5d75f7 100644 --- a/core/src/test/java/com/netease/arctic/utils/FileUtilTest.java +++ b/core/src/test/java/com/netease/arctic/utils/FileUtilTest.java @@ -18,9 +18,6 @@ package com.netease.arctic.utils; -import com.netease.arctic.data.DataFileType; -import com.netease.arctic.data.DataTreeNode; -import com.netease.arctic.data.DefaultKeyedFile; import org.junit.Assert; import org.junit.Test; @@ -40,20 +37,6 @@ public void getFileDir() { Assert.assertEquals("hdfs://easyops-sloth/user/warehouse/animal_partition_two/base/opt_mon=202109/opt_day=26", fileDir); } - @Test - public void testParseFileName() { - String fileName = - "hdfs://easyops-sloth/user/warehouse/animal_partition_two/base/5-I-2-00000-941953957-0000000001.parquet"; - DefaultKeyedFile.FileMeta fileMeta = TableFileUtils.parseFileMetaFromFileName(fileName); - Assert.assertEquals(DataFileType.INSERT_FILE, fileMeta.type()); - Assert.assertEquals(DataTreeNode.of(3,1), fileMeta.node()); - Assert.assertEquals(2, fileMeta.transactionId()); - - Assert.assertEquals(DataFileType.INSERT_FILE, TableFileUtils.parseFileTypeFromFileName(fileName)); - Assert.assertEquals(DataTreeNode.of(3,1), TableFileUtils.parseFileNodeFromFileName(fileName)); - Assert.assertEquals(2, TableFileUtils.parseFileTidFromFileName(fileName)); - } - @Test public void testGetUriPath() { Assert.assertEquals("/a/b/c", TableFileUtils.getUriPath("hdfs://xxxxx/a/b/c")); From eafd18041ecef99311de94be0fbaa68f6ee6b827 Mon Sep 17 00:00:00 2001 From: shidayang <530847445@qq.com> Date: Wed, 18 Jan 2023 17:12:32 +0800 Subject: [PATCH 2/4] Adapt new Transaction model --- .../io/writer/CommonOutputFileFactory.java | 17 ++++------------- .../io/writer/AdaptHiveOutputFileFactory.java | 17 +++++------------ 2 files changed, 9 insertions(+), 25 deletions(-) diff --git a/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java b/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java index 3999c85613..32df6d2d7f 100644 --- a/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java +++ b/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java @@ -19,6 +19,7 @@ package com.netease.arctic.io.writer; import com.netease.arctic.io.ArcticFileIO; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.utils.IdGenerator; import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; @@ -48,12 +49,7 @@ public class CommonOutputFileFactory implements OutputFileFactory { private final FileFormat format; private final ArcticFileIO io; private final EncryptionManager encryptionManager; - private final int partitionId; - private final long taskId; - private final long transactionId; - private final String operationId; - - private final AtomicLong fileCount = new AtomicLong(0); + private final FileNameHandle fileNameHandle; public CommonOutputFileFactory(String baseLocation, PartitionSpec partitionSpec, FileFormat format, ArcticFileIO io, EncryptionManager encryptionManager, @@ -63,16 +59,11 @@ public CommonOutputFileFactory(String baseLocation, PartitionSpec partitionSpec, this.format = format; this.io = io; this.encryptionManager = encryptionManager; - this.partitionId = partitionId; - this.taskId = taskId; - this.transactionId = transactionId == null ? 0 : transactionId; - this.operationId = transactionId == null ? IdGenerator.randomId() + "" : "0"; + this.fileNameHandle = new FileNameHandle(format, partitionId, taskId, transactionId); } private String generateFilename(TaskWriterKey key) { - return format.addExtension( - String.format("%d-%s-%d-%05d-%d-%s-%05d", key.getTreeNode().getId(), key.getFileType().shortName(), - transactionId, partitionId, taskId, operationId, fileCount.incrementAndGet())); + return fileNameHandle.fileName(key); } private String fileLocation(StructLike partitionData, String fileName) { diff --git a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java index a6389cdd43..79ccf1cbeb 100644 --- a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java +++ b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java @@ -20,6 +20,7 @@ import com.netease.arctic.hive.utils.HiveTableUtil; import com.netease.arctic.io.ArcticFileIO; +import com.netease.arctic.io.FileNameHandle; import com.netease.arctic.io.writer.OutputFileFactory; import com.netease.arctic.io.writer.TaskWriterKey; import com.netease.arctic.utils.IdGenerator; @@ -66,10 +67,7 @@ public class AdaptHiveOutputFileFactory implements OutputFileFactory { private final FileFormat format; private final ArcticFileIO io; private final EncryptionManager encryptionManager; - private final int partitionId; - private final long taskId; - private final long transactionId; - private final String operationId; + private final FileNameHandle fileNameHandle; private final AtomicLong fileCount = new AtomicLong(0); @@ -100,21 +98,16 @@ public AdaptHiveOutputFileFactory( this.format = format; this.io = io; this.encryptionManager = encryptionManager; - this.partitionId = partitionId; - this.taskId = taskId; - this.transactionId = transactionId == null ? 0 : transactionId; - this.operationId = transactionId == null ? IdGenerator.randomId() + "" : "0"; if (hiveSubDirectory == null) { - this.hiveSubDirectory = HiveTableUtil.newHiveSubdirectory(this.transactionId); + this.hiveSubDirectory = HiveTableUtil.newHiveSubdirectory(transactionId); } else { this.hiveSubDirectory = hiveSubDirectory; } + this.fileNameHandle = new FileNameHandle(format, partitionId, taskId, transactionId); } private String generateFilename(TaskWriterKey key) { - return format.addExtension( - String.format("%d-%s-%d-%05d-%d-%s-%05d", key.getTreeNode().getId(), key.getFileType().shortName(), - transactionId, partitionId, taskId, operationId, fileCount.incrementAndGet())); + return fileNameHandle.fileName(key); } private String fileLocation(StructLike partitionData, String fileName) { From 4a24f215a5454da6e95504bdca06f272e58310a3 Mon Sep 17 00:00:00 2001 From: shidayang <530847445@qq.com> Date: Wed, 18 Jan 2023 17:14:50 +0800 Subject: [PATCH 3/4] Adapt new Transaction model --- .../arctic/hive/io/writer/AdaptHiveOutputFileFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java index 79ccf1cbeb..19e617cc8d 100644 --- a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java +++ b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java @@ -69,8 +69,6 @@ public class AdaptHiveOutputFileFactory implements OutputFileFactory { private final EncryptionManager encryptionManager; private final FileNameHandle fileNameHandle; - private final AtomicLong fileCount = new AtomicLong(0); - public AdaptHiveOutputFileFactory( String baseLocation, PartitionSpec partitionSpec, From 2f95ac9617b8ce0918896cfecee865d1099526b0 Mon Sep 17 00:00:00 2001 From: shidayang <530847445@qq.com> Date: Wed, 18 Jan 2023 17:15:58 +0800 Subject: [PATCH 4/4] Adapt new Transaction model --- .../com/netease/arctic/io/writer/CommonOutputFileFactory.java | 2 -- .../arctic/hive/io/writer/AdaptHiveOutputFileFactory.java | 2 -- 2 files changed, 4 deletions(-) diff --git a/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java b/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java index 32df6d2d7f..256e973c39 100644 --- a/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java +++ b/core/src/main/java/com/netease/arctic/io/writer/CommonOutputFileFactory.java @@ -46,7 +46,6 @@ public class CommonOutputFileFactory implements OutputFileFactory { private final String baseLocation; private final PartitionSpec partitionSpec; - private final FileFormat format; private final ArcticFileIO io; private final EncryptionManager encryptionManager; private final FileNameHandle fileNameHandle; @@ -56,7 +55,6 @@ public CommonOutputFileFactory(String baseLocation, PartitionSpec partitionSpec, int partitionId, long taskId, Long transactionId) { this.baseLocation = baseLocation; this.partitionSpec = partitionSpec; - this.format = format; this.io = io; this.encryptionManager = encryptionManager; this.fileNameHandle = new FileNameHandle(format, partitionId, taskId, transactionId); diff --git a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java index 19e617cc8d..514cd13a3c 100644 --- a/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java +++ b/hive/src/main/java/com/netease/arctic/hive/io/writer/AdaptHiveOutputFileFactory.java @@ -64,7 +64,6 @@ public class AdaptHiveOutputFileFactory implements OutputFileFactory { private final String baseLocation; private final String hiveSubDirectory; private final PartitionSpec partitionSpec; - private final FileFormat format; private final ArcticFileIO io; private final EncryptionManager encryptionManager; private final FileNameHandle fileNameHandle; @@ -93,7 +92,6 @@ public AdaptHiveOutputFileFactory( String hiveSubDirectory) { this.baseLocation = baseLocation; this.partitionSpec = partitionSpec; - this.format = format; this.io = io; this.encryptionManager = encryptionManager; if (hiveSubDirectory == null) {