Skip to content

Commit

Permalink
Merge the dev branch.
Browse files Browse the repository at this point in the history
  • Loading branch information
shaomengwang committed Jul 20, 2022
1 parent d5277b3 commit cce6912
Show file tree
Hide file tree
Showing 497 changed files with 7,840 additions and 2,994 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,17 @@ public void open(FileInputSplit split) throws IOException {
InputFile inputFile = new ParquetInputFile(new FilePath(split.getPath(), parquetFilePath.getFileSystem()));
MessageType readSchema = ParquetUtil.getReadSchemaFromParquetFile(
new FilePath(split.getPath(), parquetFilePath.getFileSystem()));

ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
if (readSchema == null) {
skipThisSplit = true;
}
if (skipThisSplit) {
LOG.warn(
String.format(
"Escaped the file split [%s] due to mismatch of file schema to expected result schema",
split.getPath().toString()));
} else {
ParquetReadOptions options = ParquetReadOptions.builder().build();
ParquetFileReader fileReader = new ParquetFileReader(inputFile, options);
this.parquetRecordReader =
new ParquetRecordReader <>(
new RowReadSupport(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;

import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.parquet.ParquetReaderFactory;
import com.alibaba.alink.common.io.parquet.plugin.ParquetUtil.ParquetInputFile;
Expand Down Expand Up @@ -57,10 +58,14 @@ public void open(FilePath filePath) {

@Override
public boolean reachedEnd() {
try {
return parquetRecordReader.reachEnd();
} catch (Exception e) {
throw new IllegalArgumentException("cannot read parquet file");
if (parquetRecordReader != null) {
try {
return parquetRecordReader.reachEnd();
} catch (Exception e) {
throw new AkIllegalOperatorParameterException("cannot read parquet file");
}
} else {
return true;
}
}

Expand All @@ -78,15 +83,16 @@ public void close() {
try {
parquetRecordReader.close();
} catch (Exception e) {
throw new IllegalArgumentException("cannot close parquet file");
throw new AkIllegalOperatorParameterException("cannot close parquet file");
}
}
}

@Override
public TableSchema getTableSchemaFromParquetFile(FilePath filePath) {
MessageType messageType = ParquetUtil.getReadSchemaFromParquetFile(filePath);
RowTypeInfo schema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
RowTypeInfo schema = messageType == null ? new RowTypeInfo()
: (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
return new TableSchema(schema.getFieldNames(), schema.getFieldTypes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ public class ParquetSourceFactoryImpl implements ParquetSourceFactory {
public Tuple2 <RichInputFormat <Row, FileInputSplit>, TableSchema> createParquetSourceFunction(Params params) {
FilePath filePath = FilePath.deserialize(params.get(ParquetSourceParams.FILE_PATH));
MessageType messageType = ParquetUtil.getReadSchemaFromParquetFile(filePath);
RowTypeInfo rowTypeInfo = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
RowTypeInfo rowTypeInfo = messageType == null ? new RowTypeInfo()
: (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
return Tuple2.of(new GenericParquetInputFormat(filePath),
new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.apache.flink.shaded.guava18.com.google.common.collect.HashBiMap;
import org.apache.flink.table.api.TableSchema;

import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.common.io.filesystem.BaseFileSystem;
import com.alibaba.alink.common.io.filesystem.FilePath;
import org.apache.parquet.ParquetReadOptions;
Expand All @@ -21,13 +23,16 @@
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class ParquetUtil {
private static final BiMap <PrimitiveTypeName, TypeInformation <?>> PRIMITIVE_TYPE_MAP = HashBiMap.create();
private static final Logger LOG = LoggerFactory.getLogger(ParquetUtil.class);

static {
PRIMITIVE_TYPE_MAP.put(PrimitiveTypeName.BOOLEAN, Types.BOOLEAN);
Expand All @@ -44,7 +49,8 @@ public static MessageType getReadSchemaFromParquetFile(FilePath filePath) {
try {
messageType = readSchemaFromFile(filePath);
} catch (IOException e) {
throw new IllegalArgumentException("cannot read parquet footer");
throw new AkIllegalOperatorParameterException(
String.format("Doesn't have access to %s", filePath.getPathStr()));
}
if (messageType == null) {return null;}
RowTypeInfo schema = (RowTypeInfo) ParquetSchemaConverter.fromParquetType(messageType);
Expand Down Expand Up @@ -72,16 +78,20 @@ public static MessageType readSchemaFromFile(FilePath filePath) throws IOExcepti
return messageType;
}
}
return null;
} else {
try (ParquetFileReader fileReader
= new ParquetFileReader(new ParquetInputFile(filePath),
ParquetReadOptions.builder().build())) {
return fileReader.getFileMetaData().getSchema();
}catch (IOException e){
} catch (Exception e) {
LOG.warn(
String.format(
"Escaped the file [%s] due to fail reading a parquet format footer",
filePath.getPath().toString()));
return null;
}
}
return null;
}

public static class ParquetInputFile implements InputFile {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.flink.ml.api.misc.param.Params;

import com.alibaba.alink.common.exceptions.AkIllegalOperatorParameterException;
import com.alibaba.alink.params.io.RedisParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -49,7 +50,7 @@ public Redis jedisCreate(Params params) {
redisStandaloneIp = redisIpPort.split(":")[0];
redisStandalonePort = Integer.parseInt(redisIpPort.split(":")[1]);
} catch (NumberFormatException e) {
throw new IllegalArgumentException("illegal REDIS_IPS value, use 'ip:port' or ip alone");
throw new AkIllegalOperatorParameterException("illegal REDIS_IPS value, use 'ip:port' or ip alone");
}
} else {
redisStandaloneIp = redisIpPort;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/com/alibaba/alink/common/MTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import com.alibaba.alink.common.MTable.MTableDeserializer;
import com.alibaba.alink.common.MTable.MTableSerializer;
import com.alibaba.alink.common.exceptions.AkParseErrorException;
import com.alibaba.alink.common.exceptions.MTableSerializerException;
import com.alibaba.alink.common.io.filesystem.binary.BaseStreamRowSerializer;
import com.alibaba.alink.common.io.filesystem.binary.RowStreamSerializer;
Expand Down Expand Up @@ -572,7 +573,7 @@ public static MTable readCsvFromFile(BufferedReader reader, String schemaStr) th
if (tuple2.f0) {
rows.add(tuple2.f1);
} else {
throw new RuntimeException("Fail to parse line: \"" + line + "\"");
throw new AkParseErrorException("Fail to parse line: \"" + line + "\"");
}
}
return new MTable(rows, schema);
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/com/alibaba/alink/common/MTableUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.flink.types.Row;

import com.alibaba.alink.common.exceptions.AkIllegalDataException;
import com.alibaba.alink.common.utils.TableUtil;

import java.io.Serializable;
Expand Down Expand Up @@ -50,7 +51,7 @@ public static MTable getMTable(Object obj) {
} else if (obj instanceof String) {
return MTable.fromJson((String) obj);
} else {
throw new RuntimeException("Type must be string or mtable");
throw new AkIllegalDataException("Type must be string or mtable");
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.alibaba.alink.common.dl;

import org.apache.flink.util.Preconditions;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkPluginErrorException;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnimplementedOperationException;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.plugin.RegisterKey;
import com.alibaba.alink.common.io.plugin.ResourcePluginFactory;
Expand Down Expand Up @@ -82,16 +83,17 @@ static String getBertResource(ResourcePluginFactory factory, ModelName modelName
if (null != pluginFilePath) {
String directoryName = PythonFileUtils.getCompressedFileName(remotePath);
File file = new File(pluginFilePath.getPath().toString(), directoryName);
Preconditions.checkArgument(file.exists() && file.isDirectory(),
String.format("There should be a directory named %s in plugin directory %s, but cannot be found.",
directoryName, pluginFilePath.getPath().toString()));
AkPreconditions.checkArgument(file.exists() && file.isDirectory(),
new AkPluginErrorException(
String.format("There should be a directory named %s in plugin directory %s, but cannot be found.",
directoryName, pluginFilePath.getPath().toString())));
return "file://" + file.getAbsolutePath();
}

// Use default PythonEnv path in PYTHON_ENV_MAP
if (null == remotePath) {
throw new RuntimeException(String.format("Default resource path for %s %s not specified.",
modelName.name(), type.name()));
throw new AkUnimplementedOperationException(
String.format("Default resource path for %s %s not specified.", modelName.name(), type.name()));
}
LOG.info("Use plugin resource: {}", remotePath);
if (AlinkGlobalConfiguration.isPrintProcessInfo()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.io.plugin.ResourcePluginFactory;
import com.alibaba.flink.ml.cluster.ExecutionMode;
import com.alibaba.flink.ml.cluster.MLConfig;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void open(Configuration parameters) throws Exception {
return;
}
stepNo = getIterationRuntimeContext().getSuperstepNumber();
Preconditions.checkArgument(stepNo <= 2);
AkPreconditions.checkState(stepNo <= 2);
if (stepNo == 1) {
ipPortFunction.open(getRuntimeContext());
} else if (stepNo == 2) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.alibaba.alink.common.dl;

import org.apache.flink.util.Preconditions;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnimplementedOperationException;
import com.alibaba.alink.common.io.filesystem.FilePath;
import com.alibaba.alink.common.io.plugin.OsType;
import com.alibaba.alink.common.io.plugin.OsUtils;
Expand Down Expand Up @@ -101,15 +101,16 @@ static String getDefaultPythonEnv(ResourcePluginFactory factory, Version version
if (null != pluginFilePath) {
String compressedFileName = PythonFileUtils.getCompressedFileName(remotePath);
File directoryFile = new File(pluginFilePath.getPath().toString(), compressedFileName);
Preconditions.checkArgument(directoryFile.exists(),
AkPreconditions.checkState(directoryFile.exists(),
String.format("There should be a directory named %s in plugin directory %s, but cannot be found.",
compressedFileName, pluginFilePath.getPath().toString()));
return "file://" + directoryFile.getAbsolutePath();
}

// Use default PythonEnv path in PYTHON_ENV_MAP
if (null == remotePath) {
throw new RuntimeException(String.format("Default python env for %s not specified.", version.name()));
throw new AkUnimplementedOperationException(
String.format("Default python env for %s not specified.", version.name()));
}
return remotePath;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StringUtils;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.DLEnvConfig.Version;
import com.alibaba.alink.common.dl.utils.DLUtils;
import com.alibaba.alink.common.dl.utils.DataSetDiskDownloader;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.plugin.ResourcePluginFactory;
import com.alibaba.alink.common.utils.JsonConverter;
import com.alibaba.flink.ml.cluster.ExecutionMode;
Expand Down Expand Up @@ -93,7 +94,7 @@ public static void prepareBroadcastData(String workDir, RuntimeContext runtimeCo
bw.write(sbd.toString());
}
} catch (IOException e) {
throw new RuntimeException("Fail to write broadcast data to local disk.");
throw new AkUnclassifiedErrorException("Fail to write broadcast data to local disk.");
}
LOG.info("Succ in writing bc data to {}", fn);
DLUtils.safePutProperties(mlContext, DLConstants.BC_NAME_PREFIX + i, fn);
Expand All @@ -109,7 +110,7 @@ public void open(RuntimeContext runtimeContext) throws Exception {
int numWorkers = Integer.parseInt(this.config.getProperties().get(DLConstants.NUM_WORKERS));
int numPSs = Integer.parseInt(this.config.getProperties().get(DLConstants.NUM_PSS));
List <Row> bc = runtimeContext.getBroadcastVariable(DLConstants.IP_PORT_BC_NAME);
Preconditions.checkArgument(bc.size() == (numWorkers + numPSs), "Some IPs and ports are missing.");
AkPreconditions.checkState(bc.size() == (numWorkers + numPSs), "Some IPs and ports are missing.");
List <Tuple3 <Integer, String, Integer>> taskIpPorts = new ArrayList <>(bc.size());
bc.forEach(row -> {
String info = (String) row.getField(numOutputFields);
Expand Down Expand Up @@ -186,8 +187,7 @@ public void open(RuntimeContext runtimeContext) throws Exception {
int taskId = taskIpPort.f0;
ips[taskId] = taskIpPort.f1;
if (thisTaskIndex == taskId) {
Preconditions.checkArgument(ips[taskId].equals(IpHostUtil.getIpAddress()),
"task allocation changed");
AkPreconditions.checkState(ips[taskId].equals(IpHostUtil.getIpAddress()), "task allocation changed");
}
ports[taskId] = taskIpPort.f2;
}
Expand Down Expand Up @@ -245,9 +245,9 @@ public void close() {
serverFuture.cancel(true);
} catch (ExecutionException e) {
LOG.error(mlContext.getIdentity() + " node server failed");
throw new RuntimeException(e);
throw new AkUnclassifiedErrorException(mlContext.getIdentity() + " node server failed", e);
} catch (Throwable th) {
throw new RuntimeException(th);
throw new AkUnclassifiedErrorException("Exception thrown.", th);
} finally {
serverFuture = null;
long mumReadRecords = dataExchange.getReadRecords();
Expand All @@ -264,7 +264,7 @@ public void close() {
}
if (failNum > 0) {
//noinspection ThrowFromFinallyBlock
throw new RuntimeException("Python script run failed, please check TaskManager logs.");
throw new AkUnclassifiedErrorException("Python script run failed, please check TaskManager logs.");
} else {
LOG.info("Records output: " + mumReadRecords);
}
Expand Down Expand Up @@ -307,7 +307,7 @@ private void drainRead(Collector<Row> out, boolean readUntilEOF) {
serverFuture.cancel(true);
} catch (IOException e) {
LOG.error("Fail to read data from python.", e);
throw new RuntimeException(e);
throw new AkUnclassifiedErrorException("Fail to read data from python.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.alibaba.alink.common.dl.utils.DLUtils;
import com.alibaba.alink.common.dl.utils.ExternalFilesUtils;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkUnclassifiedErrorException;
import com.alibaba.alink.common.io.plugin.ResourcePluginFactory;
import com.alibaba.flink.ml.cluster.ExecutionMode;
import com.alibaba.flink.ml.cluster.MLConfig;
Expand Down Expand Up @@ -169,7 +170,7 @@ private void startDLCluster() {
dataExchange = dataExchangeFutureTaskThreadTuple3.f0;
serverFuture = dataExchangeFutureTaskThreadTuple3.f1;
} catch (Exception ex) {
throw new RuntimeException("Start TF cluster failed: ", ex);
throw new AkUnclassifiedErrorException("Start TF cluster failed: ", ex);
}
}

Expand Down Expand Up @@ -211,7 +212,7 @@ private void drainRead(Collector <Row> out, boolean readUntilEOF) {
serverFuture.cancel(true);
} catch (IOException e) {
LOG.error("Fail to read data from python.", e);
throw new RuntimeException(e);
throw new AkUnclassifiedErrorException("Fail to read data from python.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@
package com.alibaba.alink.common.dl;

import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;

import com.alibaba.alink.common.AlinkGlobalConfiguration;
import com.alibaba.alink.common.dl.utils.PythonFileUtils;
import com.alibaba.alink.common.exceptions.AkPreconditions;
import com.alibaba.alink.common.io.plugin.OsType;
import com.alibaba.alink.common.io.plugin.OsUtils;
import com.alibaba.flink.ml.cluster.node.MLContext;
Expand Down Expand Up @@ -154,7 +154,7 @@ synchronized protected void callCondaUnpack(String virtualEnv) throws IOExceptio

Path filePath = PythonFileUtils.createTempFile("call_conda_pack", fileSuffix);
try (final InputStream is = getClass().getResourceAsStream(scriptResource)) {
Preconditions.checkNotNull(is, "Cannot get resource " + scriptResource);
AkPreconditions.checkNotNull(is, "Cannot get resource " + scriptResource);
Files.copy(is, filePath, StandardCopyOption.REPLACE_EXISTING);
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
Expand Down
Loading

0 comments on commit cce6912

Please sign in to comment.