diff --git a/docs/get-started/Flink.md b/docs/get-started/Flink.md new file mode 100644 index 000000000000..e23970955bca --- /dev/null +++ b/docs/get-started/Flink.md @@ -0,0 +1,97 @@ +--- +layout: page +title: Gluten For Flink with Velox Backend +nav_order: 1 +parent: Getting-Started +--- + +# Supported Version + +| Type | Version | +|-------|------------------------------| +| Flink | 1.20 | +| OS | Ubuntu20.04/22.04, Centos7/8 | +| jdk | openjdk11/jdk17 | +| scala | 2.12 | + +# Prerequisite + +Currently, with static build Gluten+Flink+Velox backend supports all the Linux OSes, but is only tested on **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**. With dynamic build, Gluten+Velox backend support **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8** and their variants. + +Currently, the officially supported Flink versions are 1.20.*. + +We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 11** and **java 17**. + +**For x86_64** + +```bash +## make sure jdk8 is used +export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64 +export PATH=$JAVA_HOME/bin:$PATH +``` + +**For aarch64** + +```bash +## make sure jdk8 is used +export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64 +export PATH=$JAVA_HOME/bin:$PATH +``` + +**Get gluten** + +```bash +## config maven, like proxy in ~/.m2/settings.xml + +## fetch gluten code +git clone https://github.com/apache/incubator-gluten.git +``` + +# Build Gluten Flink with Velox Backend + +``` +cd /path/to/gluten/gluten-flink +mvn clean package +``` + +## Dependency library deployment + +Gluten for Flink depends on [Velox4j](https://github.com/velox4j/velox4j) to call velox. So you need to get the Velox4j packages and used them with gluten. +Velox4j jar available now is velox4j-0.1.0-SNAPSHOT.jar. + +## Submit the Flink SQL job + +Submit test script from `flink run`. You can use the `StreamSQLExample` as an example. + +### Flink local cluster +``` +var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH" +var gluten_root = "/PATH/TO/GLUTEN" +``` + +After deploying flink binaries, please add gluten-flink jar to flink library path, +including gluten-flink-runtime-1.4.0.jar, gluten-flink-loader-1.4.0.jar and Velox4j jars above. +And make them loaded before flink libraries. +Then you can go to flink binary path and use the below scripts to +submit the example job. + +```bash +bin/start-cluster.sh +bin/flink run -d -m 0.0.0.0:8080 \ + -c org.apache.flink.table.examples.java.basics.StreamSQLExample \ + lib/flink-examples-table_2.12-1.20.1.jar +``` + +Then you can get the result in `log/flink-*-taskexecutor-*.out`. +And you can see an operator named `gluten-cal` from the web frontend of your flink job. + +### Flink Yarn per job mode + +TODO + +## Notes: +Now both Gluten for Flink and Velox4j have not a bundled jar including all jar depends on. +So you may have to add these jars by yourself, which may including guava-33.4.0-jre.jar, jackson-core-2.18.0.jar, +jackson-databind-2.18.0.jar, jackson-datatype-jdk8-2.18.0.jar, jackson-annotations-2.18.0.jar, arrow-memory-core-18.1.0.jar, +arrow-memory-unsafe-18.1.0.jar, arrow-vector-18.1.0.jar, flatbuffers-java-24.3.25.jar, arrow-format-18.1.0.jar, arrow-c-data-18.1.0.jar. +We will supply bundled jars soon. \ No newline at end of file diff --git a/gluten-flink/README.md b/gluten-flink/README.md new file mode 100644 index 000000000000..bcc6cf4f3075 --- /dev/null +++ b/gluten-flink/README.md @@ -0,0 +1,2 @@ +# Gluten Flink Project +Gluten for Flink is under developing now, you can refer to [user guide](../docs/get-started/Flink.md) for a quick usage. \ No newline at end of file diff --git a/gluten-flink/loader/pom.xml b/gluten-flink/loader/pom.xml new file mode 100644 index 000000000000..bc32eedeac40 --- /dev/null +++ b/gluten-flink/loader/pom.xml @@ -0,0 +1,82 @@ + + + + 4.0.0 + + + org.apache.gluten + gluten-flink + 1.4.0-SNAPSHOT + ../pom.xml + + + gluten-flink-loader + Gluten Flink Loader + jar + + + + org.apache.flink + flink-core + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + + org.apache.gluten + gluten-flink-planner + ${project.version} + runtime + + + + + + + org.apache.maven.plugins + maven-dependency-plugin + + + copy-table-planner-jars + prepare-package + + copy + + + + + org.apache.gluten + gluten-flink-planner + ${project.version} + jar + true + gluten-flink-planner.jar + + + ${project.build.directory}/classes + + + + + + + diff --git a/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java b/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java new file mode 100644 index 000000000000..38f7b89c33df --- /dev/null +++ b/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.loader; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.core.classloading.ComponentClassLoader; +import org.apache.flink.table.api.TableException; +import org.apache.flink.table.delegation.ExecutorFactory; +import org.apache.flink.table.delegation.PlannerFactory; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.net.URL; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Stream; + +/** + * This class will overwrite the PlannerModule in Flink to load gluten-flink-planner.jar. + * So that it will load the classes in gluten code first. + */ +class PlannerModule { + + /** + * The name of the table planner dependency jar, bundled with flink-table-planner-loader module + * artifact. + */ + static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar"; + + private static final String HINT_USAGE = + "mvn clean package -pl flink-table/flink-table-planner,flink-table/flink-table-planner-loader -DskipTests"; + + private static final String[] OWNER_CLASSPATH = + Stream.concat( + Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS), + Stream.of( + // These packages are shipped either by + // flink-table-runtime or flink-dist itself + "org.codehaus.janino", + "org.codehaus.commons", + "org.apache.commons.lang3", + "org.apache.commons.math3", + // with hive dialect, hadoop jar should be in classpath, + // also, we should make it loaded by owner classloader, + // otherwise, it'll throw class not found exception + // when initialize HiveParser which requires hadoop + "org.apache.hadoop")) + .toArray(String[]::new); + + private static final String[] COMPONENT_CLASSPATH = new String[] { + "org.apache.flink", + "org.apache.gluten", + "io.github", + "com.google" + }; + + private static final Map KNOWN_MODULE_ASSOCIATIONS = new HashMap<>(); + + static { + KNOWN_MODULE_ASSOCIATIONS.put("org.apache.gluten.table.runtime", "gluten-flink-runtime"); + KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.table.runtime", "flink-table-runtime"); + KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.formats.raw", "flink-table-runtime"); + + KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.janino", "flink-table-runtime"); + KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.commons", "flink-table-runtime"); + KNOWN_MODULE_ASSOCIATIONS.put( + "org.apache.flink.table.shaded.com.jayway", "flink-table-runtime"); + } + + private final PlannerComponentClassLoader submoduleClassLoader; + + private PlannerModule() { + try { + final ClassLoader flinkClassLoader = PlannerModule.class.getClassLoader(); + + final Path tmpDirectory = + Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]); + Files.createDirectories(FileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory)); + final Path tempFile = + Files.createFile( + tmpDirectory.resolve( + "flink-table-planner_" + UUID.randomUUID() + ".jar")); + + final InputStream resourceStream = + flinkClassLoader.getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR); + InputStream glutenStream = + flinkClassLoader.getResourceAsStream("gluten-flink-planner.jar"); + if (resourceStream == null || glutenStream == null) { + throw new TableException( + String.format( + "Flink Table planner could not be found. If this happened while running a test in the IDE, " + + "run '%s' on the command-line, " + + "or add a test dependency on the flink-table-planner-loader test-jar.", + HINT_USAGE)); + } + final Path glutenFile = + Files.createFile( + tmpDirectory.resolve( + "gluten-flink-planner_" + UUID.randomUUID() + ".jar")); + + IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile)); + IOUtils.copyBytes(glutenStream, Files.newOutputStream(glutenFile)); + tempFile.toFile().deleteOnExit(); + glutenFile.toFile().deleteOnExit(); + + this.submoduleClassLoader = + new PlannerComponentClassLoader( + new URL[] { + glutenFile.toUri().toURL(), + tempFile.toUri().toURL()}, + flinkClassLoader, + OWNER_CLASSPATH, + COMPONENT_CLASSPATH, + KNOWN_MODULE_ASSOCIATIONS); + } catch (IOException e) { + throw new TableException( + "Could not initialize the table planner components loader.", e); + } + } + + public void addUrlToClassLoader(URL url) { + // add the url to component url + this.submoduleClassLoader.addURL(url); + } + + // Singleton lazy initialization + + private static class PlannerComponentsHolder { + private static final PlannerModule INSTANCE = new PlannerModule(); + } + + public static PlannerModule getInstance() { + return PlannerComponentsHolder.INSTANCE; + } + + // load methods for various components provided by the planner + + public ExecutorFactory loadExecutorFactory() { + return FactoryUtil.discoverFactory( + this.submoduleClassLoader, + ExecutorFactory.class, + ExecutorFactory.DEFAULT_IDENTIFIER); + } + + public PlannerFactory loadPlannerFactory() { + return FactoryUtil.discoverFactory( + this.submoduleClassLoader, PlannerFactory.class, PlannerFactory.DEFAULT_IDENTIFIER); + } + + /** + * A class loader extending {@link ComponentClassLoader} which overwrites method{@link #addURL} + * to enable it can add url to component classloader. + */ + private static class PlannerComponentClassLoader extends ComponentClassLoader { + + public PlannerComponentClassLoader( + URL[] classpath, + ClassLoader ownerClassLoader, + String[] ownerFirstPackages, + String[] componentFirstPackages, + Map knownPackagePrefixesModuleAssociation) { + super( + classpath, + ownerClassLoader, + ownerFirstPackages, + componentFirstPackages, + knownPackagePrefixesModuleAssociation); + } + + @Override + public void addURL(URL url) { + super.addURL(url); + } + } +} diff --git a/gluten-flink/planner/pom.xml b/gluten-flink/planner/pom.xml new file mode 100644 index 000000000000..b3fc899fe536 --- /dev/null +++ b/gluten-flink/planner/pom.xml @@ -0,0 +1,68 @@ + + + + 4.0.0 + + + org.apache.gluten + gluten-flink + 1.4.0-SNAPSHOT + ../pom.xml + + + gluten-flink-planner + Gluten Flink Planner + jar + + + + org.apache.flink + flink-table-api-java + ${flink.version} + provided + + + org.apache.flink + flink-table-planner_${scala.binary.version} + ${flink.version} + provided + + + org.apache.gluten + gluten-flink-runtime + ${project.version} + provided + + + org.apache.flink + flink-clients + ${flink.version} + provided + + + io.github.zhztheplayer + velox4j + ${velox4j.version} + provided + + + + + + + + diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java new file mode 100644 index 000000000000..7b1939f57b1b --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.plan.nodes.exec.stream; + +import org.apache.gluten.rexnode.Utils; +import org.apache.gluten.rexnode.LogicalTypeConverter; +import org.apache.gluten.rexnode.RexNodeConverter; +import org.apache.gluten.table.runtime.operators.GlutenCalOperator; + +import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle; +import io.github.zhztheplayer.velox4j.expression.TypedExpr; +import io.github.zhztheplayer.velox4j.plan.FilterNode; +import io.github.zhztheplayer.velox4j.plan.PlanNode; +import io.github.zhztheplayer.velox4j.plan.ProjectNode; +import io.github.zhztheplayer.velox4j.plan.TableScanNode; +import io.github.zhztheplayer.velox4j.serde.Serde; +import io.github.zhztheplayer.velox4j.type.Type; +import org.apache.calcite.rex.RexNode; +import org.apache.flink.FlinkVersion; +import org.apache.flink.api.dag.Transformation; +import org.apache.flink.configuration.ReadableConfig; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.planner.delegation.PlannerBase; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata; +import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig; +import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext; +import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; +import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc; +import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil; +import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +/** Gluten Stream {@link ExecNode} for Calc to use {@link GlutenCalOperator}. */ +@ExecNodeMetadata( + name = "stream-exec-calc", + version = 1, + producedTransformations = CommonExecCalc.CALC_TRANSFORMATION, + minPlanVersion = FlinkVersion.v1_15, + minStateVersion = FlinkVersion.v1_15) +public class StreamExecCalc extends CommonExecCalc implements StreamExecNode { + + public StreamExecCalc( + ReadableConfig tableConfig, + List projection, + @Nullable RexNode condition, + InputProperty inputProperty, + RowType outputType, + String description) { + this( + ExecNodeContext.newNodeId(), + ExecNodeContext.newContext(StreamExecCalc.class), + ExecNodeContext.newPersistedConfig(StreamExecCalc.class, tableConfig), + projection, + condition, + Collections.singletonList(inputProperty), + outputType, + description); + } + + @JsonCreator + public StreamExecCalc( + @JsonProperty(FIELD_NAME_ID) int id, + @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context, + @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig, + @JsonProperty(FIELD_NAME_PROJECTION) List projection, + @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition, + @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties, + @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType, + @JsonProperty(FIELD_NAME_DESCRIPTION) String description) { + super( + id, + context, + persistedConfig, + projection, + condition, + TableStreamOperator.class, + true, // retainHeader + inputProperties, + outputType, + description); + } + + @Override + public Transformation translateToPlanInternal( + PlannerBase planner, ExecNodeConfig config) { + final ExecEdge inputEdge = getInputEdges().get(0); + final Transformation inputTransform = + (Transformation) inputEdge.translateToPlan(planner); + + // add a mock input as velox not allow the source is empty. + // TODO: remove it. + Type inputType = LogicalTypeConverter.toVLType(inputEdge.getOutputType()); + List inNames = Utils.getNamesFromRowType(inputEdge.getOutputType()); + PlanNode mockInput = new TableScanNode( + String.valueOf(ExecNodeContext.newNodeId()), + inputType, + new ExternalStreamTableHandle("connector-external-stream"), + List.of()); + PlanNode filter = new FilterNode( + String.valueOf(getId()), + List.of(mockInput), + RexNodeConverter.toTypedExpr(condition, inNames)); + List projectExprs = RexNodeConverter.toTypedExpr(projection, inNames); + PlanNode project = new ProjectNode( + String.valueOf(ExecNodeContext.newNodeId()), + List.of(filter), + Utils.getNamesFromRowType(getOutputType()), + projectExprs); + // TODO: velo4j not support serializable now. + Utils.registerRegistry(); + String plan = Serde.toJson(project); + String inputStr = Serde.toJson(inputType); + Type outputType = LogicalTypeConverter.toVLType(getOutputType()); + String outputStr = Serde.toJson(outputType); + final GlutenCalOperator calOperator = new GlutenCalOperator(plan, mockInput.getId(), inputStr, outputStr); + return ExecNodeUtil.createOneInputTransformation( + inputTransform, + new TransformationMetadata("gluten-calc", "Gluten cal operator"), + calOperator, + InternalTypeInfo.of(getOutputType()), + inputTransform.getParallelism(), + false); + } +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java new file mode 100644 index 000000000000..77890461a7b9 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.rexnode; + +import java.util.HashMap; +import java.util.Map; + +/** Mapping of flink function and velox function. */ +public class FunctionMappings { + // A map stores the relationship between flink function name and velox function. + private static Map functionMappings = new HashMap() { + { + // TODO: support more functions. + put(">", "greaterthan"); + put("<", "lessthan"); + } + }; + + public static String toVeloxFunction(String funcName) { + if (functionMappings.containsKey(funcName)) { + return functionMappings.get(funcName); + } else { + throw new RuntimeException("Function not supported: " + funcName); + } + } +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java new file mode 100644 index 000000000000..27e5ab6102d6 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.rexnode; + +import io.github.zhztheplayer.velox4j.type.IntegerType; +import io.github.zhztheplayer.velox4j.type.Type; +import org.apache.flink.table.types.logical.BigIntType; +import org.apache.flink.table.types.logical.IntType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.VarCharType; + +import java.util.List; +import java.util.stream.Collectors; + +/** Convertor to convert Flink LogicalType to velox data Type */ +public class LogicalTypeConverter { + + public static Type toVLType(LogicalType logicalType) { + if (logicalType instanceof RowType) { + RowType flinkRowType = (RowType) logicalType; + List fieldTypes = flinkRowType.getChildren().stream(). + map(LogicalTypeConverter::toVLType). + collect(Collectors.toList()); + return new io.github.zhztheplayer.velox4j.type.RowType( + flinkRowType.getFieldNames(), + fieldTypes); + } else if (logicalType instanceof IntType) { + return new IntegerType(); + } else if (logicalType instanceof BigIntType) { + return new io.github.zhztheplayer.velox4j.type.BigIntType(); + } else if (logicalType instanceof VarCharType) { + return new io.github.zhztheplayer.velox4j.type.VarCharType(); + } else { + throw new RuntimeException("Unsupported logical type: " + logicalType); + } + } +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java new file mode 100644 index 000000000000..7c824bcb0327 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.rexnode; + +import io.github.zhztheplayer.velox4j.expression.CallTypedExpr; +import io.github.zhztheplayer.velox4j.expression.ConstantTypedExpr; +import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr; +import io.github.zhztheplayer.velox4j.expression.TypedExpr; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.BooleanType; +import io.github.zhztheplayer.velox4j.type.IntegerType; +import io.github.zhztheplayer.velox4j.type.Type; +import io.github.zhztheplayer.velox4j.type.VarCharType; +import io.github.zhztheplayer.velox4j.variant.BigIntValue; +import io.github.zhztheplayer.velox4j.variant.BooleanValue; +import io.github.zhztheplayer.velox4j.variant.DoubleValue; +import io.github.zhztheplayer.velox4j.variant.IntegerValue; +import io.github.zhztheplayer.velox4j.variant.SmallIntValue; +import io.github.zhztheplayer.velox4j.variant.TinyIntValue; +import io.github.zhztheplayer.velox4j.variant.VarBinaryValue; +import io.github.zhztheplayer.velox4j.variant.VarCharValue; +import io.github.zhztheplayer.velox4j.variant.Variant; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rex.RexCall; +import org.apache.calcite.rex.RexInputRef; +import org.apache.calcite.rex.RexLiteral; +import org.apache.calcite.rex.RexNode; + +import java.util.List; +import java.util.stream.Collectors; + +/** Convertor to convert RexNode to velox TypedExpr */ +public class RexNodeConverter { + + public static TypedExpr toTypedExpr(RexNode rexNode, List inNames) { + if (rexNode instanceof RexLiteral) { + RexLiteral literal = (RexLiteral) rexNode; + return new ConstantTypedExpr( + toType(literal.getType()), + toVariant(literal), + null); + } else if (rexNode instanceof RexCall) { + RexCall rexCall = (RexCall) rexNode; + List params = toTypedExpr(rexCall.getOperands(), inNames); + Type nodeType = toType(rexCall.getType()); + return new CallTypedExpr( + nodeType, + params, + FunctionMappings.toVeloxFunction(rexCall.getOperator().getName())); + } else if (rexNode instanceof RexInputRef) { + RexInputRef inputRef = (RexInputRef) rexNode; + return FieldAccessTypedExpr.create( + toType(inputRef.getType()), + inNames.get(inputRef.getIndex())); + } else { + throw new RuntimeException("Unrecognized RexNode: " + rexNode); + } + } + + public static List toTypedExpr(List rexNodes, List inNames) { + return rexNodes.stream() + .map(rexNode -> toTypedExpr(rexNode, inNames)) + .collect(Collectors.toList()); + } + + public static Type toType(RelDataType relDataType) { + switch (relDataType.getSqlTypeName()) { + case BOOLEAN: + return new BooleanType(); + case INTEGER: + return new IntegerType(); + case BIGINT: + return new BigIntType(); + case VARCHAR: + return new VarCharType(); + default: + throw new RuntimeException("Unsupported type: " + relDataType.getSqlTypeName()); + } + } + + public static Variant toVariant(RexLiteral literal) { + switch (literal.getType().getSqlTypeName()) { + case BOOLEAN: + return new BooleanValue((boolean) literal.getValue()); + case TINYINT: + return new TinyIntValue(Integer.valueOf(literal.getValue().toString())); + case SMALLINT: + return new SmallIntValue(Integer.valueOf(literal.getValue().toString())); + case INTEGER: + return new IntegerValue(Integer.valueOf(literal.getValue().toString())); + case BIGINT: + return new BigIntValue(Long.valueOf(literal.getValue().toString())); + case DOUBLE: + return new DoubleValue(Double.valueOf(literal.getValue().toString())); + case VARCHAR: + return new VarCharValue(literal.getValue().toString()); + case BINARY: + return new VarBinaryValue(literal.getValue().toString()); + default: + throw new RuntimeException( + "Unsupported rex node type: " + literal.getType().getSqlTypeName()); + } + } + +} diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java new file mode 100644 index 000000000000..643013fb2862 --- /dev/null +++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.rexnode; + +import io.github.zhztheplayer.velox4j.serializable.ISerializableRegistry; +import io.github.zhztheplayer.velox4j.variant.VariantRegistry; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.RowType; + +import java.util.List; + +/** Utility to store some useful functions. */ +public class Utils { + + private static boolean registryInitialized = false; + // Get names for project node. + public static List getNamesFromRowType(LogicalType logicalType) { + if (logicalType instanceof RowType) { + RowType rowType = (RowType) logicalType; + return rowType.getFieldNames(); + } else { + throw new RuntimeException("Output type is not row type: " + logicalType); + } + } + + // Init serialize related registries. + public static void registerRegistry() { + if (!registryInitialized) { + registryInitialized = true; + VariantRegistry.registerAll(); + ISerializableRegistry.registerAll(); + } + } +} diff --git a/gluten-flink/pom.xml b/gluten-flink/pom.xml new file mode 100644 index 000000000000..124049897751 --- /dev/null +++ b/gluten-flink/pom.xml @@ -0,0 +1,41 @@ + + + + 4.0.0 + + + org.apache.gluten + gluten-parent + 1.4.0-SNAPSHOT + ../pom.xml + + + gluten-flink + Gluten Flink + pom + + + planner + loader + runtime + + + + 1.20.0 + 0.1.0-SNAPSHOT + + diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml new file mode 100644 index 000000000000..1454e09a0904 --- /dev/null +++ b/gluten-flink/runtime/pom.xml @@ -0,0 +1,63 @@ + + + + 4.0.0 + + + org.apache.gluten + gluten-flink + 1.4.0-SNAPSHOT + ../pom.xml + + + gluten-flink-runtime + Gluten Flink Runtime + jar + + + + org.apache.flink + flink-streaming-java + ${flink.version} + provided + + + org.apache.flink + flink-table-runtime + ${flink.version} + provided + + + io.github.zhztheplayer + velox4j + ${velox4j.version} + provided + + + org.apache.arrow + arrow-memory-core + 18.1.0 + compile + + + + + + + + + diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java new file mode 100644 index 000000000000..101ec7737153 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -0,0 +1,823 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskexecutor; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JMXServerOptions; +import org.apache.flink.configuration.RpcOptions; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.TaskManagerOptionsInternal; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.plugin.PluginManager; +import org.apache.flink.core.plugin.PluginUtils; +import org.apache.flink.core.security.FlinkSecurityManager; +import org.apache.flink.management.jmx.JMXService; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.BlobCacheService; +import org.apache.flink.runtime.blob.BlobUtils; +import org.apache.flink.runtime.blob.TaskExecutorBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils; +import org.apache.flink.runtime.entrypoint.DeterminismEnvelope; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.WorkingDirectory; +import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider; +import org.apache.flink.runtime.externalresource.ExternalResourceUtils; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.MetricRegistryImpl; +import org.apache.flink.runtime.metrics.ReporterSetup; +import org.apache.flink.runtime.metrics.TraceReporterSetup; +import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; +import org.apache.flink.runtime.metrics.util.MetricUtils; +import org.apache.flink.runtime.rpc.AddressResolution; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcSystem; +import org.apache.flink.runtime.rpc.RpcSystemUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.security.SecurityConfiguration; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository; +import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader; +import org.apache.flink.runtime.taskmanager.MemoryLogger; +import org.apache.flink.runtime.util.ConfigurationParserUtils; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.Hardware; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.AbstractID; +import org.apache.flink.util.AutoCloseableAsync; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Reference; +import org.apache.flink.util.ShutdownHookUtil; +import org.apache.flink.util.StringUtils; +import org.apache.flink.util.TaskManagerExceptionUtils; +import org.apache.flink.util.concurrent.ExecutorThreadFactory; +import org.apache.flink.util.concurrent.FutureUtils; +import org.apache.flink.util.function.FunctionUtils; + +import io.github.zhztheplayer.velox4j.Velox4j; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetAddress; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This class is the executable entry point for the task manager in yarn or standalone mode. It + * constructs the related components (network, I/O manager, memory manager, RPC service, HA service) + * and starts them. + */ +public class TaskManagerRunner implements FatalErrorHandler { + + private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class); + + private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L; + + private static final int SUCCESS_EXIT_CODE = 0; + @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1; + + private final Thread shutdownHook; + + private final Object lock = new Object(); + + private final Configuration configuration; + + private final Time timeout; + + private final PluginManager pluginManager; + + private final TaskExecutorServiceFactory taskExecutorServiceFactory; + + private final CompletableFuture terminationFuture; + + @GuardedBy("lock") + private DeterminismEnvelope resourceId; + + /** Executor used to run future callbacks. */ + @GuardedBy("lock") + private ExecutorService executor; + + @GuardedBy("lock") + private RpcSystem rpcSystem; + + @GuardedBy("lock") + private RpcService rpcService; + + @GuardedBy("lock") + private HighAvailabilityServices highAvailabilityServices; + + @GuardedBy("lock") + private MetricRegistryImpl metricRegistry; + + @GuardedBy("lock") + private BlobCacheService blobCacheService; + + @GuardedBy("lock") + private DeterminismEnvelope workingDirectory; + + @GuardedBy("lock") + private TaskExecutorService taskExecutorService; + + @GuardedBy("lock") + private boolean shutdown; + + public TaskManagerRunner( + Configuration configuration, + PluginManager pluginManager, + TaskExecutorServiceFactory taskExecutorServiceFactory) + throws Exception { + this.configuration = checkNotNull(configuration); + this.pluginManager = checkNotNull(pluginManager); + this.taskExecutorServiceFactory = checkNotNull(taskExecutorServiceFactory); + + timeout = Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION)); + + this.terminationFuture = new CompletableFuture<>(); + this.shutdown = false; + + this.shutdownHook = + ShutdownHookUtil.addShutdownHook( + () -> this.closeAsync(Result.JVM_SHUTDOWN).join(), + getClass().getSimpleName(), + LOG); + } + + private void startTaskManagerRunnerServices() throws Exception { + synchronized (lock) { + rpcSystem = RpcSystem.load(configuration); + + this.executor = + Executors.newScheduledThreadPool( + Hardware.getNumberCPUCores(), + new ExecutorThreadFactory("taskmanager-future")); + + highAvailabilityServices = + HighAvailabilityServicesUtils.createHighAvailabilityServices( + configuration, + executor, + AddressResolution.NO_ADDRESS_RESOLUTION, + rpcSystem, + this); + + JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT)); + + rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem); + + this.resourceId = + getTaskManagerResourceID( + configuration, rpcService.getAddress(), rpcService.getPort()); + + this.workingDirectory = + ClusterEntrypointUtils.createTaskManagerWorkingDirectory( + configuration, resourceId); + + LOG.info("Using working directory: {}", workingDirectory); + + HeartbeatServices heartbeatServices = + HeartbeatServices.fromConfiguration(configuration); + + metricRegistry = + new MetricRegistryImpl( + MetricRegistryConfiguration.fromConfiguration( + configuration, + rpcSystem.getMaximumMessageSizeInBytes(configuration)), + ReporterSetup.fromConfiguration(configuration, pluginManager), + TraceReporterSetup.fromConfiguration(configuration, pluginManager)); + + final RpcService metricQueryServiceRpcService = + MetricUtils.startRemoteMetricsRpcService( + configuration, + rpcService.getAddress(), + configuration.get(TaskManagerOptions.BIND_HOST), + rpcSystem); + metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap()); + + blobCacheService = + BlobUtils.createBlobCacheService( + configuration, + Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()), + highAvailabilityServices.createBlobStore(), + null); + + final ExternalResourceInfoProvider externalResourceInfoProvider = + ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig( + configuration, pluginManager); + + final DelegationTokenReceiverRepository delegationTokenReceiverRepository = + new DelegationTokenReceiverRepository(configuration, pluginManager); + + taskExecutorService = + taskExecutorServiceFactory.createTaskExecutor( + this.configuration, + this.resourceId.unwrap(), + rpcService, + highAvailabilityServices, + heartbeatServices, + metricRegistry, + blobCacheService, + false, + externalResourceInfoProvider, + workingDirectory.unwrap(), + this, + delegationTokenReceiverRepository); + + handleUnexpectedTaskExecutorServiceTermination(); + + MemoryLogger.startIfConfigured( + LOG, configuration, terminationFuture.thenAccept(ignored -> {})); + } + } + + @GuardedBy("lock") + private void handleUnexpectedTaskExecutorServiceTermination() { + taskExecutorService + .getTerminationFuture() + .whenComplete( + (unused, throwable) -> { + synchronized (lock) { + if (!shutdown) { + onFatalError( + new FlinkException( + "Unexpected termination of the TaskExecutor.", + throwable)); + } + } + }); + } + + // -------------------------------------------------------------------------------------------- + // Lifecycle management + // -------------------------------------------------------------------------------------------- + + public void start() throws Exception { + synchronized (lock) { + startTaskManagerRunnerServices(); + taskExecutorService.start(); + } + } + + public void close() throws Exception { + try { + closeAsync().get(); + } catch (ExecutionException e) { + ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e)); + } + } + + public CompletableFuture closeAsync() { + return closeAsync(Result.SUCCESS); + } + + private CompletableFuture closeAsync(Result terminationResult) { + synchronized (lock) { + // remove shutdown hook to prevent resource leaks + ShutdownHookUtil.removeShutdownHook(shutdownHook, this.getClass().getSimpleName(), LOG); + + if (shutdown) { + return terminationFuture; + } + + final CompletableFuture taskManagerTerminationFuture; + if (taskExecutorService != null) { + taskManagerTerminationFuture = taskExecutorService.closeAsync(); + } else { + taskManagerTerminationFuture = FutureUtils.completedVoidFuture(); + } + + final CompletableFuture serviceTerminationFuture = + FutureUtils.composeAfterwards( + taskManagerTerminationFuture, this::shutDownServices); + + final CompletableFuture workingDirCleanupFuture = + FutureUtils.runAfterwards( + serviceTerminationFuture, () -> deleteWorkingDir(terminationResult)); + + final CompletableFuture rpcSystemClassLoaderCloseFuture; + + if (rpcSystem != null) { + rpcSystemClassLoaderCloseFuture = + FutureUtils.runAfterwards(workingDirCleanupFuture, rpcSystem::close); + } else { + rpcSystemClassLoaderCloseFuture = FutureUtils.completedVoidFuture(); + } + + rpcSystemClassLoaderCloseFuture.whenComplete( + (Void ignored, Throwable throwable) -> { + if (throwable != null) { + terminationFuture.completeExceptionally(throwable); + } else { + terminationFuture.complete(terminationResult); + } + }); + + shutdown = true; + return terminationFuture; + } + } + + private void deleteWorkingDir(Result terminationResult) throws IOException { + synchronized (lock) { + if (workingDirectory != null) { + if (!workingDirectory.isDeterministic() || terminationResult == Result.SUCCESS) { + workingDirectory.unwrap().delete(); + } + } + } + } + + private CompletableFuture shutDownServices() { + synchronized (lock) { + Collection> terminationFutures = new ArrayList<>(3); + Exception exception = null; + + try { + JMXService.stopInstance(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + + if (blobCacheService != null) { + try { + blobCacheService.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + } + + if (metricRegistry != null) { + try { + terminationFutures.add(metricRegistry.closeAsync()); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + } + + if (highAvailabilityServices != null) { + try { + highAvailabilityServices.close(); + } catch (Exception e) { + exception = ExceptionUtils.firstOrSuppressed(e, exception); + } + } + + if (rpcService != null) { + terminationFutures.add(rpcService.closeAsync()); + } + + if (executor != null) { + terminationFutures.add( + ExecutorUtils.nonBlockingShutdown( + timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor)); + } + + if (exception != null) { + terminationFutures.add(FutureUtils.completedExceptionally(exception)); + } + + return FutureUtils.completeAll(terminationFutures); + } + } + + // export the termination future for caller to know it is terminated + public CompletableFuture getTerminationFuture() { + return terminationFuture; + } + + // -------------------------------------------------------------------------------------------- + // FatalErrorHandler methods + // -------------------------------------------------------------------------------------------- + + @Override + public void onFatalError(Throwable exception) { + TaskManagerExceptionUtils.tryEnrichTaskManagerError(exception); + LOG.error( + "Fatal error occurred while executing the TaskManager. Shutting it down...", + exception); + + if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) { + terminateJVM(); + } else { + closeAsync(Result.FAILURE); + + FutureUtils.orTimeout( + terminationFuture, + FATAL_ERROR_SHUTDOWN_TIMEOUT_MS, + TimeUnit.MILLISECONDS, + String.format( + "Waiting for TaskManager shutting down timed out after %s ms.", + FATAL_ERROR_SHUTDOWN_TIMEOUT_MS)); + } + } + + private void terminateJVM() { + FlinkSecurityManager.forceProcessExit(FAILURE_EXIT_CODE); + } + + // -------------------------------------------------------------------------------------------- + // Static entry point + // -------------------------------------------------------------------------------------------- + + public static void main(String[] args) throws Exception { + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit(); + + if (maxOpenFileHandles != -1L) { + LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles); + } else { + LOG.info("Cannot determine the maximum number of open file descriptors"); + } + + runTaskManagerProcessSecurely(args); + } + + public static Configuration loadConfiguration(String[] args) throws FlinkParseException { + return ConfigurationParserUtils.loadCommonConfiguration( + args, TaskManagerRunner.class.getSimpleName()); + } + + public static int runTaskManager(Configuration configuration, PluginManager pluginManager) + throws Exception { + final TaskManagerRunner taskManagerRunner; + + try { + taskManagerRunner = + new TaskManagerRunner( + configuration, + pluginManager, + TaskManagerRunner::createTaskExecutorService); + taskManagerRunner.start(); + } catch (Exception exception) { + throw new FlinkException("Failed to start the TaskManagerRunner.", exception); + } + + try { + return taskManagerRunner.getTerminationFuture().get().getExitCode(); + } catch (Throwable t) { + throw new FlinkException( + "Unexpected failure during runtime of TaskManagerRunner.", + ExceptionUtils.stripExecutionException(t)); + } + } + + public static void runTaskManagerProcessSecurely(String[] args) { + Configuration configuration = null; + + try { + configuration = loadConfiguration(args); + } catch (FlinkParseException fpe) { + LOG.error("Could not load the configuration.", fpe); + System.exit(FAILURE_EXIT_CODE); + } + + runTaskManagerProcessSecurely(checkNotNull(configuration)); + } + + public static void runTaskManagerProcessSecurely(Configuration configuration) { + FlinkSecurityManager.setFromConfiguration(configuration); + final PluginManager pluginManager = + PluginUtils.createPluginManagerFromRootFolder(configuration); + FileSystem.initialize(configuration, pluginManager); + + StateChangelogStorageLoader.initialize(pluginManager); + Velox4j.initialize(); + + int exitCode; + Throwable throwable = null; + + ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration); + try { + SecurityUtils.install(new SecurityConfiguration(configuration)); + + exitCode = + SecurityUtils.getInstalledContext() + .runSecured(() -> runTaskManager(configuration, pluginManager)); + } catch (Throwable t) { + throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); + exitCode = FAILURE_EXIT_CODE; + } + + if (throwable != null) { + LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable); + } else { + LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode); + } + + System.exit(exitCode); + } + + // -------------------------------------------------------------------------------------------- + // Static utilities + // -------------------------------------------------------------------------------------------- + + public static TaskExecutorService createTaskExecutorService( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + BlobCacheService blobCacheService, + boolean localCommunicationOnly, + ExternalResourceInfoProvider externalResourceInfoProvider, + WorkingDirectory workingDirectory, + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) + throws Exception { + + final TaskExecutor taskExecutor = + startTaskManager( + configuration, + resourceID, + rpcService, + highAvailabilityServices, + heartbeatServices, + metricRegistry, + blobCacheService, + localCommunicationOnly, + externalResourceInfoProvider, + workingDirectory, + fatalErrorHandler, + delegationTokenReceiverRepository); + + return TaskExecutorToServiceAdapter.createFor(taskExecutor); + } + + public static TaskExecutor startTaskManager( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + TaskExecutorBlobService taskExecutorBlobService, + boolean localCommunicationOnly, + ExternalResourceInfoProvider externalResourceInfoProvider, + WorkingDirectory workingDirectory, + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) + throws Exception { + + checkNotNull(configuration); + checkNotNull(resourceID); + checkNotNull(rpcService); + checkNotNull(highAvailabilityServices); + + LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata()); + + SystemOutRedirectionUtils.redirectSystemOutAndError(configuration); + + String externalAddress = rpcService.getAddress(); + + final TaskExecutorResourceSpec taskExecutorResourceSpec = + TaskExecutorResourceUtils.resourceSpecFromConfig(configuration); + + TaskManagerServicesConfiguration taskManagerServicesConfiguration = + TaskManagerServicesConfiguration.fromConfiguration( + configuration, + resourceID, + externalAddress, + localCommunicationOnly, + taskExecutorResourceSpec, + workingDirectory); + + Tuple2 taskManagerMetricGroup = + MetricUtils.instantiateTaskManagerMetricGroup( + metricRegistry, + externalAddress, + resourceID, + taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval()); + + final ExecutorService ioExecutor = + Executors.newFixedThreadPool( + taskManagerServicesConfiguration.getNumIoThreads(), + new ExecutorThreadFactory("flink-taskexecutor-io")); + + TaskManagerServices taskManagerServices = + TaskManagerServices.fromConfiguration( + taskManagerServicesConfiguration, + taskExecutorBlobService.getPermanentBlobService(), + taskManagerMetricGroup.f1, + ioExecutor, + rpcService.getScheduledExecutor(), + fatalErrorHandler, + workingDirectory); + + MetricUtils.instantiateFlinkMemoryMetricGroup( + taskManagerMetricGroup.f1, + taskManagerServices.getTaskSlotTable(), + taskManagerServices::getManagedMemorySize); + + TaskManagerConfiguration taskManagerConfiguration = + TaskManagerConfiguration.fromConfiguration( + configuration, + taskExecutorResourceSpec, + externalAddress, + workingDirectory.getTmpDirectory()); + + String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress(); + + return new TaskExecutor( + rpcService, + taskManagerConfiguration, + highAvailabilityServices, + taskManagerServices, + externalResourceInfoProvider, + heartbeatServices, + taskManagerMetricGroup.f0, + metricQueryServiceAddress, + taskExecutorBlobService, + fatalErrorHandler, + new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()), + delegationTokenReceiverRepository); + } + + /** + * Create a RPC service for the task manager. + * + * @param configuration The configuration for the TaskManager. + * @param haServices to use for the task manager hostname retrieval + */ + @VisibleForTesting + static RpcService createRpcService( + final Configuration configuration, + final HighAvailabilityServices haServices, + final RpcSystem rpcSystem) + throws Exception { + + checkNotNull(configuration); + checkNotNull(haServices); + + return RpcUtils.createRemoteRpcService( + rpcSystem, + configuration, + determineTaskManagerBindAddress(configuration, haServices, rpcSystem), + configuration.get(TaskManagerOptions.RPC_PORT), + configuration.get(TaskManagerOptions.BIND_HOST), + configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT)); + } + + private static String determineTaskManagerBindAddress( + final Configuration configuration, + final HighAvailabilityServices haServices, + RpcSystemUtils rpcSystemUtils) + throws Exception { + + final String configuredTaskManagerHostname = configuration.get(TaskManagerOptions.HOST); + + if (configuredTaskManagerHostname != null) { + LOG.info( + "Using configured hostname/address for TaskManager: {}.", + configuredTaskManagerHostname); + return configuredTaskManagerHostname; + } else { + return determineTaskManagerBindAddressByConnectingToResourceManager( + configuration, haServices, rpcSystemUtils); + } + } + + private static String determineTaskManagerBindAddressByConnectingToResourceManager( + final Configuration configuration, + final HighAvailabilityServices haServices, + RpcSystemUtils rpcSystemUtils) + throws LeaderRetrievalException { + + final Duration lookupTimeout = configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION); + + final InetAddress taskManagerAddress = + LeaderRetrievalUtils.findConnectingAddress( + haServices.getResourceManagerLeaderRetriever(), + lookupTimeout, + rpcSystemUtils); + + LOG.info( + "TaskManager will use hostname/address '{}' ({}) for communication.", + taskManagerAddress.getHostName(), + taskManagerAddress.getHostAddress()); + + HostBindPolicy bindPolicy = + HostBindPolicy.fromString(configuration.get(TaskManagerOptions.HOST_BIND_POLICY)); + return bindPolicy == HostBindPolicy.IP + ? taskManagerAddress.getHostAddress() + : taskManagerAddress.getHostName(); + } + + @VisibleForTesting + static DeterminismEnvelope getTaskManagerResourceID( + Configuration config, String rpcAddress, int rpcPort) { + + final String metadata = + config.get(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, ""); + return config.getOptional(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID) + .map( + value -> + DeterminismEnvelope.deterministicValue( + new ResourceID(value, metadata))) + .orElseGet( + FunctionUtils.uncheckedSupplier( + () -> { + final String hostName = + InetAddress.getLocalHost().getHostName(); + final String value = + StringUtils.isNullOrWhitespaceOnly(rpcAddress) + ? hostName + + "-" + + new AbstractID() + .toString() + .substring(0, 6) + : rpcAddress + + ":" + + rpcPort + + "-" + + new AbstractID() + .toString() + .substring(0, 6); + return DeterminismEnvelope.nondeterministicValue( + new ResourceID(value, metadata)); + })); + } + + /** Factory for {@link TaskExecutor}. */ + public interface TaskExecutorServiceFactory { + TaskExecutorService createTaskExecutor( + Configuration configuration, + ResourceID resourceID, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + BlobCacheService blobCacheService, + boolean localCommunicationOnly, + ExternalResourceInfoProvider externalResourceInfoProvider, + WorkingDirectory workingDirectory, + FatalErrorHandler fatalErrorHandler, + DelegationTokenReceiverRepository delegationTokenReceiverRepository) + throws Exception; + } + + public interface TaskExecutorService extends AutoCloseableAsync { + void start(); + + CompletableFuture getTerminationFuture(); + } + + public enum Result { + SUCCESS(SUCCESS_EXIT_CODE), + JVM_SHUTDOWN(FAILURE_EXIT_CODE), + FAILURE(FAILURE_EXIT_CODE); + + private final int exitCode; + + Result(int exitCode) { + this.exitCode = exitCode; + } + + public int getExitCode() { + return exitCode; + } + } +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java new file mode 100644 index 000000000000..609bd987cab2 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.table.runtime.operators; + +import io.github.zhztheplayer.velox4j.connector.ExternalStream; +import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit; +import io.github.zhztheplayer.velox4j.iterator.DownIterator; +import io.github.zhztheplayer.velox4j.iterator.UpIterator; +import io.github.zhztheplayer.velox4j.query.BoundSplit; +import io.github.zhztheplayer.velox4j.serde.Serde; +import io.github.zhztheplayer.velox4j.type.RowType; +import org.apache.gluten.vectorized.VLVectorIterator; +import org.apache.gluten.vectorized.FlinkRowToVLRowConvertor; + +import io.github.zhztheplayer.velox4j.Velox4j; +import io.github.zhztheplayer.velox4j.config.Config; +import io.github.zhztheplayer.velox4j.config.ConnectorConfig; +import io.github.zhztheplayer.velox4j.memory.AllocationListener; +import io.github.zhztheplayer.velox4j.memory.MemoryManager; +import io.github.zhztheplayer.velox4j.plan.PlanNode; +import io.github.zhztheplayer.velox4j.query.Query; +import io.github.zhztheplayer.velox4j.session.Session; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; + +import java.util.List; + +/** Calculate operator in gluten, which will call Velox to run. */ +public class GlutenCalOperator extends TableStreamOperator + implements OneInputStreamOperator, GlutenOperator { + + private final String glutenPlan; + private final String id; + private final String inputType; + private final String outputType; + + private StreamRecord outElement = null; + + private Session session; + private Query query; + private VLVectorIterator inputIterator; + BufferAllocator allocator; + + public GlutenCalOperator(String plan, String id, String inputType, String outputType) { + this.glutenPlan = plan; + this.id = id; + this.inputType = inputType; + this.outputType = outputType; + } + + @Override + public void open() throws Exception { + super.open(); + outElement = new StreamRecord(null); + session = Velox4j.newSession(MemoryManager.create(AllocationListener.NOOP)); + + inputIterator = new VLVectorIterator(); + ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator)); + List splits = List.of( + new BoundSplit( + id, + -1, + new ExternalStreamConnectorSplit("connector-external-stream", es.id()))); + PlanNode filter = Serde.fromJson(glutenPlan, PlanNode.class); + query = new Query(filter, splits, Config.empty(), ConnectorConfig.empty()); + allocator = new RootAllocator(Long.MAX_VALUE); + + } + + @Override + public void processElement(StreamRecord element) { + inputIterator.addRow( + FlinkRowToVLRowConvertor.fromRowData( + element.getValue(), + allocator, + session, + Serde.fromJson(inputType, RowType.class))); + UpIterator result = session.queryOps().execute(query); + if (result.hasNext()) { + output.collect( + outElement.replace( + FlinkRowToVLRowConvertor.toRowData( + result.next(), + allocator, + session, + Serde.fromJson(outputType, RowType.class)))); + } + } + +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java new file mode 100644 index 000000000000..9e8c6f012087 --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gluten.table.runtime.operators; + +/** Interface for all gluten operators. */ +public interface GlutenOperator { +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java new file mode 100644 index 000000000000..beb52433c00b --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.vectorized; + +import io.github.zhztheplayer.velox4j.data.RowVector; +import io.github.zhztheplayer.velox4j.session.Session; +import io.github.zhztheplayer.velox4j.type.BigIntType; +import io.github.zhztheplayer.velox4j.type.IntegerType; +import io.github.zhztheplayer.velox4j.type.RowType; +import io.github.zhztheplayer.velox4j.type.Type; +import io.github.zhztheplayer.velox4j.type.VarCharType; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.BigIntVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.VarCharVector; +import org.apache.arrow.vector.table.Table; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.data.binary.BinaryStringData; + +import java.util.ArrayList; +import java.util.List; + +/** Converter between velox RowVector and Flink RowData. */ +public class FlinkRowToVLRowConvertor { + + public static RowVector fromRowData( + RowData row, + BufferAllocator allocator, + Session session, + RowType rowType) { + // TODO: support more types + List arrowVectors = new ArrayList<>(rowType.size()); + for (int i = 0; i < rowType.size(); i++) { + Type fieldType = rowType.getChildren().get(i); + if (fieldType instanceof IntegerType) { + IntVector intVector = new IntVector(rowType.getNames().get(i), allocator); + intVector.setSafe(0, row.getInt(i)); + intVector.setValueCount(1); + arrowVectors.add(i, intVector); + } else if (fieldType instanceof BigIntType) { + BigIntVector bigIntVector = new BigIntVector(rowType.getNames().get(i), allocator); + bigIntVector.setSafe(0, row.getLong(i)); + bigIntVector.setValueCount(1); + arrowVectors.add(i, bigIntVector); + } else if (fieldType instanceof VarCharType) { + VarCharVector stringVector = new VarCharVector(rowType.getNames().get(i), allocator); + stringVector.setSafe(0, row.getString(i).toBytes()); + stringVector.setValueCount(1); + arrowVectors.add(i, stringVector); + } else { + throw new RuntimeException("Unsupported field type: " + fieldType); + } + } + return session.arrowOps().fromArrowTable(allocator, new Table(arrowVectors)); + } + + public static RowData toRowData( + RowVector rowVector, + BufferAllocator allocator, + Session session, + RowType rowType) { + // TODO: support more types + FieldVector fieldVector = session.arrowOps().toArrowVector( + allocator, + rowVector.loadedVector()); + List fieldValues = new ArrayList<>(rowType.size()); + for (int i = 0; i < rowType.size(); i++) { + Type fieldType = rowType.getChildren().get(i); + if (fieldType instanceof IntegerType) { + fieldValues.add(i, ((IntVector) fieldVector.getChildrenFromFields().get(i)).get(0)); + } else if (fieldType instanceof BigIntType) { + fieldValues.add(i, ((BigIntVector) fieldVector.getChildrenFromFields().get(i)).get(0)); + } else if (fieldType instanceof VarCharType) { + fieldValues.add( + i, + BinaryStringData.fromBytes( + ((VarCharVector) fieldVector.getChildrenFromFields().get(i)).get(0))); + } else { + throw new RuntimeException("Unsupported field type: " + fieldType); + } + } + return GenericRowData.of(fieldValues.toArray()); + } + +} diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/VLVectorIterator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/VLVectorIterator.java new file mode 100644 index 000000000000..363bdfb423de --- /dev/null +++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/VLVectorIterator.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.vectorized; + +import io.github.zhztheplayer.velox4j.data.RowVector; + +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +/** Iterator for velox RowVector. */ +public class VLVectorIterator implements Iterator { + + private final List rows; + + public VLVectorIterator() { + this.rows = new LinkedList<>(); + } + + public boolean hasNext() { + return !rows.isEmpty(); + } + + public RowVector next() { + if (!hasNext()) { + return null; + } + return rows.remove(0); + } + + public void addRow(RowVector row) { + rows.add(row); + } +}