Skip to content

Commit

Permalink
Merge pull request #4 from shidayang/fix-994-1
Browse files Browse the repository at this point in the history
Adapt new Transaction model
  • Loading branch information
wangtaohz authored Jan 18, 2023
2 parents 8947395 + 2f95ac9 commit 8405d54
Show file tree
Hide file tree
Showing 17 changed files with 281 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netease.arctic.ams.server.utils.ContentFileUtil;
import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.TableProperties;
import com.netease.arctic.table.UnkeyedTable;
Expand Down Expand Up @@ -219,12 +220,12 @@ private List<BaseOptimizeTask> collectKeyedTableTasks(String partition, FileTree
if (!baseFiles.isEmpty()) {
List<DataTreeNode> sourceNodes = Collections.singletonList(subTree.getNode());
Set<DataTreeNode> baseFileNodes = baseFiles.stream()
.map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString()))
.map(dataFile -> FileNameHandle.parseFileNodeFromFileName(dataFile.path().toString()))
.collect(Collectors.toSet());
List<DeleteFile> posDeleteFiles = partitionPosDeleteFiles
.computeIfAbsent(partition, e -> Collections.emptyList()).stream()
.filter(deleteFile ->
baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString())))
baseFileNodes.contains(FileNameHandle.parseFileNodeFromFileName(deleteFile.path().toString())))
.collect(Collectors.toList());

if (nodeTaskNeedBuild(posDeleteFiles, baseFiles)) {
Expand Down Expand Up @@ -288,7 +289,7 @@ private void addBaseFileIntoFileTree() {

private long getMaxTransactionId(List<DataFile> dataFiles) {
OptionalLong maxTransactionId = dataFiles.stream()
.mapToLong(file -> TableFileUtils.parseFileTidFromFileName(file.path().toString())).max();
.mapToLong(file -> FileNameHandle.parseBase(file.path().toString()).transactionId()).max();
if (maxTransactionId.isPresent()) {
return maxTransactionId.getAsLong();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.netease.arctic.ams.server.utils.ContentFileUtil;
import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.table.KeyedTable;
import com.netease.arctic.table.TableProperties;
Expand Down Expand Up @@ -247,7 +248,8 @@ public long getTransactionId() {

public long getLegacyTransactionId() {
if (legacyTransactionId == -1) {
legacyTransactionId = TableFileUtils.parseFileTidFromFileName(dataFileInfo.getPath());
legacyTransactionId = FileNameHandle.parseChange(dataFileInfo.getPath(),
dataFileInfo.getSequence()).transactionId();
}
return legacyTransactionId;
}
Expand Down Expand Up @@ -338,12 +340,12 @@ private List<BaseOptimizeTask> collectKeyedTableTasks(String partition, FileTree
sourceNodes = Collections.singletonList(subTree.getNode());
}
Set<DataTreeNode> baseFileNodes = baseFiles.stream()
.map(dataFile -> TableFileUtils.parseFileNodeFromFileName(dataFile.path().toString()))
.map(dataFile -> FileNameHandle.parseFileNodeFromFileName(dataFile.path().toString()))
.collect(Collectors.toSet());
List<DeleteFile> posDeleteFiles = partitionPosDeleteFiles
.computeIfAbsent(partition, e -> Collections.emptyList()).stream()
.filter(deleteFile ->
baseFileNodes.contains(TableFileUtils.parseFileNodeFromFileName(deleteFile.path().toString())))
baseFileNodes.contains(FileNameHandle.parseFileNodeFromFileName(deleteFile.path().toString())))
.collect(Collectors.toList());
// if no insert files and no eq-delete file, skip
if (CollectionUtils.isEmpty(insertFiles) && CollectionUtils.isEmpty(deleteFiles)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.netease.arctic.hive.utils.HivePartitionUtil;
import com.netease.arctic.hive.utils.HiveTableUtil;
import com.netease.arctic.hive.utils.TableTypeUtil;
import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.table.ArcticTable;
import com.netease.arctic.utils.IdGenerator;
import com.netease.arctic.utils.SerializationUtils;
Expand Down Expand Up @@ -82,7 +83,7 @@ public boolean commit(long baseSnapshotId) throws Exception {
.map(fileByte -> (DataFile) SerializationUtils.toInternalTableFile(fileByte))
.collect(Collectors.toList());
long maxTransactionId = targetFiles.stream()
.mapToLong(dataFile -> TableFileUtils.parseFileTidFromFileName(dataFile.path().toString()))
.mapToLong(dataFile -> FileNameHandle.parseBase(dataFile.path().toString()).transactionId())
.max()
.orElse(0L);

Expand Down
3 changes: 3 additions & 0 deletions core/src/main/java/com/netease/arctic/data/DataTreeNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public final class DataTreeNode implements Serializable {
public static final DataTreeNode ROOT = new DataTreeNode(0, 0);

public static DataTreeNode of(long mask, long index) {
if (index > mask) {
throw new IllegalArgumentException("index can not be greater than mask");
}
return new DataTreeNode(mask, index);
}

Expand Down
53 changes: 4 additions & 49 deletions core/src/main/java/com/netease/arctic/data/DefaultKeyedFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package com.netease.arctic.data;

import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.utils.TableFileUtils;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileFormat;
Expand Down Expand Up @@ -47,12 +48,12 @@ private DefaultKeyedFile(DataFile internalFile, FileMeta meta) {
}

public static DefaultKeyedFile parseChange(DataFile dataFile, Long sequenceNumber) {
FileMeta fileMeta = FileMeta.parseChange(dataFile.path().toString(), sequenceNumber);
FileMeta fileMeta = FileNameHandle.parseChange(dataFile.path().toString(), sequenceNumber);
return new DefaultKeyedFile(dataFile, fileMeta);
}

public static DefaultKeyedFile parseBase(DataFile dataFile) {
FileMeta fileMeta = FileMeta.parseBase(dataFile.path().toString());
FileMeta fileMeta = FileNameHandle.parseBase(dataFile.path().toString());
return new DefaultKeyedFile(dataFile, fileMeta);
}

Expand Down Expand Up @@ -175,14 +176,11 @@ public int hashCode() {

public static class FileMeta {

private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-(\\d+)\\.\\w+";
private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING);

private final long transactionId;
private final DataFileType type;
private final DataTreeNode node;

private FileMeta(Long transactionId, DataFileType type, DataTreeNode node) {
public FileMeta(Long transactionId, DataFileType type, DataTreeNode node) {
this.transactionId = transactionId;
this.type = type;
this.node = node;
Expand All @@ -199,48 +197,5 @@ public DataFileType type() {
public DataTreeNode node() {
return node;
}

/**
* Flink write transactionId as 0.
* if we get transactionId from path is 0, we set transactionId as iceberg sequenceNumber.
* @param path file path
* @param sequenceNumber iceberg sequenceNumber
*/
public static FileMeta parseChange(String path, Long sequenceNumber) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
long nodeId = 1;
DataFileType type = null;
long transactionId = 0L;
if (matcher.matches()) {
nodeId = Long.parseLong(matcher.group(1));
type = DataFileType.ofShortName(matcher.group(2));
transactionId = Long.parseLong(matcher.group(3));
transactionId = transactionId == 0 ? sequenceNumber : transactionId;
}
DataTreeNode node = DataTreeNode.ofId(nodeId);
return new DefaultKeyedFile.FileMeta(transactionId, type, node);
}

/**
* Path writen by hive can not be pared by arctic format. so we set it transactionId as 0.
* @param path file path
*/
public static FileMeta parseBase(String path) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
long nodeId = 1;
DataFileType type = DataFileType.BASE_FILE;
long transactionId = 0L;
if (matcher.matches()) {
nodeId = Long.parseLong(matcher.group(1));
type = DataFileType.ofShortName(matcher.group(2));
transactionId = Long.parseLong(matcher.group(3));
} else {
transactionId = 0;
}
DataTreeNode node = DataTreeNode.ofId(nodeId);
return new DefaultKeyedFile.FileMeta(transactionId, type, node);
}
}
}
151 changes: 151 additions & 0 deletions core/src/main/java/com/netease/arctic/io/FileNameHandle.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netease.arctic.io;

import com.netease.arctic.data.DataFileType;
import com.netease.arctic.data.DataTreeNode;
import com.netease.arctic.data.DefaultKeyedFile;
import com.netease.arctic.io.writer.TaskWriterKey;
import com.netease.arctic.utils.IdGenerator;
import com.netease.arctic.utils.TableFileUtils;
import org.apache.iceberg.FileFormat;

import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class FileNameHandle {

private static final String KEYED_FILE_NAME_PATTERN_STRING = "(\\d+)-(\\w+)-(\\d+)-(\\d+)-(\\d+)-.*";
private static final Pattern KEYED_FILE_NAME_PATTERN = Pattern.compile(KEYED_FILE_NAME_PATTERN_STRING);

private static final String FORMAT = "%d-%s-%d-%05d-%d-%s-%05d";

private FileFormat fileFormat;
private final int partitionId;
private final long taskId;
private final long transactionId;

// uuid avoid duplicated file name
private final String operationId = IdGenerator.randomId() + "";
private final AtomicLong fileCount = new AtomicLong(0);

public FileNameHandle(
FileFormat fileFormat,
int partitionId,
Long taskId,
Long transactionId) {
this.fileFormat = fileFormat;
this.partitionId = partitionId;
this.taskId = taskId;
this.transactionId = transactionId == null ? 0 : transactionId;
}

public String fileName(TaskWriterKey key) {
return fileFormat.addExtension(
String.format(FORMAT, key.getTreeNode().getId(), key.getFileType().shortName(),
transactionId, partitionId, taskId, operationId, fileCount.incrementAndGet()));
}

/**
* Flink write transactionId as 0.
* if we get transactionId from path is 0, we set transactionId as iceberg sequenceNumber.
* @param path file path
* @param sequenceNumber iceberg sequenceNumber
*/
public static DefaultKeyedFile.FileMeta parseChange(String path, Long sequenceNumber) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
long nodeId = 1;
DataFileType type = null;
long transactionId = 0L;
if (matcher.matches()) {
nodeId = Long.parseLong(matcher.group(1));
type = DataFileType.ofShortName(matcher.group(2));
transactionId = Long.parseLong(matcher.group(3));
transactionId = transactionId == 0 ? sequenceNumber : transactionId;
}
DataTreeNode node = DataTreeNode.ofId(nodeId);
return new DefaultKeyedFile.FileMeta(transactionId, type, node);
}

/**
* Path writen by hive can not be pared by arctic format. so we set it transactionId as 0.
* @param path file path
*/
public static DefaultKeyedFile.FileMeta parseBase(String path) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
long nodeId = 1;
DataFileType type = DataFileType.BASE_FILE;
long transactionId = 0L;
if (matcher.matches()) {
nodeId = Long.parseLong(matcher.group(1));
type = DataFileType.ofShortName(matcher.group(2));
if (type == DataFileType.INSERT_FILE) {
type = DataFileType.BASE_FILE;
}
transactionId = Long.parseLong(matcher.group(3));
} else {
transactionId = 0;
}
DataTreeNode node = DataTreeNode.ofId(nodeId);
return new DefaultKeyedFile.FileMeta(transactionId, type, node);
}

public static DataFileType parseFileTypeForChange(String path) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
DataFileType type;
if (matcher.matches()) {
type = DataFileType.ofShortName(matcher.group(2));
} else {
throw new IllegalArgumentException("path is illegal");
}
return type;
}

public static DataFileType parseFileTypeForBase(String path) {
String fileName = TableFileUtils.getFileName(path);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
DataFileType type = DataFileType.BASE_FILE;
if (matcher.matches()) {
type = DataFileType.ofShortName(matcher.group(2));
if (type == DataFileType.INSERT_FILE) {
type = DataFileType.BASE_FILE;
}
}
return type;
}

/**
* parse keyed file node id from file name
* @param fileName fileName
* @return node id
*/
public static DataTreeNode parseFileNodeFromFileName(String fileName) {
fileName = TableFileUtils.getFileName(fileName);
Matcher matcher = KEYED_FILE_NAME_PATTERN.matcher(fileName);
long nodeId = 1;
if (matcher.matches()) {
nodeId = Long.parseLong(matcher.group(1));
}
return DataTreeNode.ofId(nodeId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package com.netease.arctic.io.writer;

import com.netease.arctic.io.ArcticFileIO;
import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.utils.IdGenerator;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
Expand All @@ -45,34 +46,22 @@
public class CommonOutputFileFactory implements OutputFileFactory {
private final String baseLocation;
private final PartitionSpec partitionSpec;
private final FileFormat format;
private final ArcticFileIO io;
private final EncryptionManager encryptionManager;
private final int partitionId;
private final long taskId;
private final long transactionId;
private final String operationId;

private final AtomicLong fileCount = new AtomicLong(0);
private final FileNameHandle fileNameHandle;

public CommonOutputFileFactory(String baseLocation, PartitionSpec partitionSpec,
FileFormat format, ArcticFileIO io, EncryptionManager encryptionManager,
int partitionId, long taskId, Long transactionId) {
this.baseLocation = baseLocation;
this.partitionSpec = partitionSpec;
this.format = format;
this.io = io;
this.encryptionManager = encryptionManager;
this.partitionId = partitionId;
this.taskId = taskId;
this.transactionId = transactionId == null ? 0 : transactionId;
this.operationId = transactionId == null ? IdGenerator.randomId() + "" : "0";
this.fileNameHandle = new FileNameHandle(format, partitionId, taskId, transactionId);
}

private String generateFilename(TaskWriterKey key) {
return format.addExtension(
String.format("%d-%s-%d-%05d-%d-%s-%05d", key.getTreeNode().getId(), key.getFileType().shortName(),
transactionId, partitionId, taskId, operationId, fileCount.incrementAndGet()));
return fileNameHandle.fileName(key);
}

private String fileLocation(StructLike partitionData, String fileName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.netease.arctic.data.DefaultKeyedFile;
import com.netease.arctic.data.PrimaryKeyedFile;
import com.netease.arctic.io.FileNameHandle;
import com.netease.arctic.utils.TableFileUtils;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
Expand Down Expand Up @@ -56,7 +57,7 @@ public BaseArcticFileScanTask(
this.baseFile = baseFile;
this.posDeleteFiles = posDeleteFiles == null ?
Collections.emptyList() : posDeleteFiles.stream().filter(s -> {
DefaultKeyedFile.FileMeta fileMeta = DefaultKeyedFile.FileMeta.parseBase(s.path().toString());
DefaultKeyedFile.FileMeta fileMeta = FileNameHandle.parseBase(s.path().toString());
return fileMeta.node().index() == baseFile.node().index() &&
fileMeta.node().mask() == baseFile.node().mask();
}).collect(Collectors.toList());
Expand Down
Loading

0 comments on commit 8405d54

Please sign in to comment.