diff --git a/docs/get-started/Flink.md b/docs/get-started/Flink.md
new file mode 100644
index 000000000000..e23970955bca
--- /dev/null
+++ b/docs/get-started/Flink.md
@@ -0,0 +1,97 @@
+---
+layout: page
+title: Gluten For Flink with Velox Backend
+nav_order: 1
+parent: Getting-Started
+---
+
+# Supported Version
+
+| Type | Version |
+|-------|------------------------------|
+| Flink | 1.20 |
+| OS | Ubuntu20.04/22.04, Centos7/8 |
+| jdk | openjdk11/jdk17 |
+| scala | 2.12 |
+
+# Prerequisite
+
+Currently, with static build Gluten+Flink+Velox backend supports all the Linux OSes, but is only tested on **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**. With dynamic build, Gluten+Velox backend support **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8** and their variants.
+
+Currently, the officially supported Flink versions are 1.20.*.
+
+We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 11** and **java 17**.
+
+**For x86_64**
+
+```bash
+## make sure jdk8 is used
+export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
+export PATH=$JAVA_HOME/bin:$PATH
+```
+
+**For aarch64**
+
+```bash
+## make sure jdk8 is used
+export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64
+export PATH=$JAVA_HOME/bin:$PATH
+```
+
+**Get gluten**
+
+```bash
+## config maven, like proxy in ~/.m2/settings.xml
+
+## fetch gluten code
+git clone https://github.com/apache/incubator-gluten.git
+```
+
+# Build Gluten Flink with Velox Backend
+
+```
+cd /path/to/gluten/gluten-flink
+mvn clean package
+```
+
+## Dependency library deployment
+
+Gluten for Flink depends on [Velox4j](https://github.com/velox4j/velox4j) to call velox. So you need to get the Velox4j packages and used them with gluten.
+Velox4j jar available now is velox4j-0.1.0-SNAPSHOT.jar.
+
+## Submit the Flink SQL job
+
+Submit test script from `flink run`. You can use the `StreamSQLExample` as an example.
+
+### Flink local cluster
+```
+var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
+var gluten_root = "/PATH/TO/GLUTEN"
+```
+
+After deploying flink binaries, please add gluten-flink jar to flink library path,
+including gluten-flink-runtime-1.4.0.jar, gluten-flink-loader-1.4.0.jar and Velox4j jars above.
+And make them loaded before flink libraries.
+Then you can go to flink binary path and use the below scripts to
+submit the example job.
+
+```bash
+bin/start-cluster.sh
+bin/flink run -d -m 0.0.0.0:8080 \
+ -c org.apache.flink.table.examples.java.basics.StreamSQLExample \
+ lib/flink-examples-table_2.12-1.20.1.jar
+```
+
+Then you can get the result in `log/flink-*-taskexecutor-*.out`.
+And you can see an operator named `gluten-cal` from the web frontend of your flink job.
+
+### Flink Yarn per job mode
+
+TODO
+
+## Notes:
+Now both Gluten for Flink and Velox4j have not a bundled jar including all jar depends on.
+So you may have to add these jars by yourself, which may including guava-33.4.0-jre.jar, jackson-core-2.18.0.jar,
+jackson-databind-2.18.0.jar, jackson-datatype-jdk8-2.18.0.jar, jackson-annotations-2.18.0.jar, arrow-memory-core-18.1.0.jar,
+arrow-memory-unsafe-18.1.0.jar, arrow-vector-18.1.0.jar, flatbuffers-java-24.3.25.jar, arrow-format-18.1.0.jar, arrow-c-data-18.1.0.jar.
+We will supply bundled jars soon.
\ No newline at end of file
diff --git a/gluten-flink/README.md b/gluten-flink/README.md
new file mode 100644
index 000000000000..bcc6cf4f3075
--- /dev/null
+++ b/gluten-flink/README.md
@@ -0,0 +1,2 @@
+# Gluten Flink Project
+Gluten for Flink is under developing now, you can refer to [user guide](../docs/get-started/Flink.md) for a quick usage.
\ No newline at end of file
diff --git a/gluten-flink/loader/pom.xml b/gluten-flink/loader/pom.xml
new file mode 100644
index 000000000000..bc32eedeac40
--- /dev/null
+++ b/gluten-flink/loader/pom.xml
@@ -0,0 +1,82 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.gluten
+ gluten-flink
+ 1.4.0-SNAPSHOT
+ ../pom.xml
+
+
+ gluten-flink-loader
+ Gluten Flink Loader
+ jar
+
+
+
+ org.apache.flink
+ flink-core
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.gluten
+ gluten-flink-planner
+ ${project.version}
+ runtime
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-dependency-plugin
+
+
+ copy-table-planner-jars
+ prepare-package
+
+ copy
+
+
+
+
+ org.apache.gluten
+ gluten-flink-planner
+ ${project.version}
+ jar
+ true
+ gluten-flink-planner.jar
+
+
+ ${project.build.directory}/classes
+
+
+
+
+
+
+
diff --git a/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java b/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
new file mode 100644
index 000000000000..38f7b89c33df
--- /dev/null
+++ b/gluten-flink/loader/src/main/java/org/apache/flink/table/planner/loader/PlannerModule.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.loader;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.core.classloading.ComponentClassLoader;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.delegation.ExecutorFactory;
+import org.apache.flink.table.delegation.PlannerFactory;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.IOUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+/**
+ * This class will overwrite the PlannerModule in Flink to load gluten-flink-planner.jar.
+ * So that it will load the classes in gluten code first.
+ */
+class PlannerModule {
+
+ /**
+ * The name of the table planner dependency jar, bundled with flink-table-planner-loader module
+ * artifact.
+ */
+ static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar";
+
+ private static final String HINT_USAGE =
+ "mvn clean package -pl flink-table/flink-table-planner,flink-table/flink-table-planner-loader -DskipTests";
+
+ private static final String[] OWNER_CLASSPATH =
+ Stream.concat(
+ Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS),
+ Stream.of(
+ // These packages are shipped either by
+ // flink-table-runtime or flink-dist itself
+ "org.codehaus.janino",
+ "org.codehaus.commons",
+ "org.apache.commons.lang3",
+ "org.apache.commons.math3",
+ // with hive dialect, hadoop jar should be in classpath,
+ // also, we should make it loaded by owner classloader,
+ // otherwise, it'll throw class not found exception
+ // when initialize HiveParser which requires hadoop
+ "org.apache.hadoop"))
+ .toArray(String[]::new);
+
+ private static final String[] COMPONENT_CLASSPATH = new String[] {
+ "org.apache.flink",
+ "org.apache.gluten",
+ "io.github",
+ "com.google"
+ };
+
+ private static final Map KNOWN_MODULE_ASSOCIATIONS = new HashMap<>();
+
+ static {
+ KNOWN_MODULE_ASSOCIATIONS.put("org.apache.gluten.table.runtime", "gluten-flink-runtime");
+ KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.table.runtime", "flink-table-runtime");
+ KNOWN_MODULE_ASSOCIATIONS.put("org.apache.flink.formats.raw", "flink-table-runtime");
+
+ KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.janino", "flink-table-runtime");
+ KNOWN_MODULE_ASSOCIATIONS.put("org.codehaus.commons", "flink-table-runtime");
+ KNOWN_MODULE_ASSOCIATIONS.put(
+ "org.apache.flink.table.shaded.com.jayway", "flink-table-runtime");
+ }
+
+ private final PlannerComponentClassLoader submoduleClassLoader;
+
+ private PlannerModule() {
+ try {
+ final ClassLoader flinkClassLoader = PlannerModule.class.getClassLoader();
+
+ final Path tmpDirectory =
+ Paths.get(ConfigurationUtils.parseTempDirectories(new Configuration())[0]);
+ Files.createDirectories(FileUtils.getTargetPathIfContainsSymbolicPath(tmpDirectory));
+ final Path tempFile =
+ Files.createFile(
+ tmpDirectory.resolve(
+ "flink-table-planner_" + UUID.randomUUID() + ".jar"));
+
+ final InputStream resourceStream =
+ flinkClassLoader.getResourceAsStream(FLINK_TABLE_PLANNER_FAT_JAR);
+ InputStream glutenStream =
+ flinkClassLoader.getResourceAsStream("gluten-flink-planner.jar");
+ if (resourceStream == null || glutenStream == null) {
+ throw new TableException(
+ String.format(
+ "Flink Table planner could not be found. If this happened while running a test in the IDE, "
+ + "run '%s' on the command-line, "
+ + "or add a test dependency on the flink-table-planner-loader test-jar.",
+ HINT_USAGE));
+ }
+ final Path glutenFile =
+ Files.createFile(
+ tmpDirectory.resolve(
+ "gluten-flink-planner_" + UUID.randomUUID() + ".jar"));
+
+ IOUtils.copyBytes(resourceStream, Files.newOutputStream(tempFile));
+ IOUtils.copyBytes(glutenStream, Files.newOutputStream(glutenFile));
+ tempFile.toFile().deleteOnExit();
+ glutenFile.toFile().deleteOnExit();
+
+ this.submoduleClassLoader =
+ new PlannerComponentClassLoader(
+ new URL[] {
+ glutenFile.toUri().toURL(),
+ tempFile.toUri().toURL()},
+ flinkClassLoader,
+ OWNER_CLASSPATH,
+ COMPONENT_CLASSPATH,
+ KNOWN_MODULE_ASSOCIATIONS);
+ } catch (IOException e) {
+ throw new TableException(
+ "Could not initialize the table planner components loader.", e);
+ }
+ }
+
+ public void addUrlToClassLoader(URL url) {
+ // add the url to component url
+ this.submoduleClassLoader.addURL(url);
+ }
+
+ // Singleton lazy initialization
+
+ private static class PlannerComponentsHolder {
+ private static final PlannerModule INSTANCE = new PlannerModule();
+ }
+
+ public static PlannerModule getInstance() {
+ return PlannerComponentsHolder.INSTANCE;
+ }
+
+ // load methods for various components provided by the planner
+
+ public ExecutorFactory loadExecutorFactory() {
+ return FactoryUtil.discoverFactory(
+ this.submoduleClassLoader,
+ ExecutorFactory.class,
+ ExecutorFactory.DEFAULT_IDENTIFIER);
+ }
+
+ public PlannerFactory loadPlannerFactory() {
+ return FactoryUtil.discoverFactory(
+ this.submoduleClassLoader, PlannerFactory.class, PlannerFactory.DEFAULT_IDENTIFIER);
+ }
+
+ /**
+ * A class loader extending {@link ComponentClassLoader} which overwrites method{@link #addURL}
+ * to enable it can add url to component classloader.
+ */
+ private static class PlannerComponentClassLoader extends ComponentClassLoader {
+
+ public PlannerComponentClassLoader(
+ URL[] classpath,
+ ClassLoader ownerClassLoader,
+ String[] ownerFirstPackages,
+ String[] componentFirstPackages,
+ Map knownPackagePrefixesModuleAssociation) {
+ super(
+ classpath,
+ ownerClassLoader,
+ ownerFirstPackages,
+ componentFirstPackages,
+ knownPackagePrefixesModuleAssociation);
+ }
+
+ @Override
+ public void addURL(URL url) {
+ super.addURL(url);
+ }
+ }
+}
diff --git a/gluten-flink/planner/pom.xml b/gluten-flink/planner/pom.xml
new file mode 100644
index 000000000000..b3fc899fe536
--- /dev/null
+++ b/gluten-flink/planner/pom.xml
@@ -0,0 +1,68 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.gluten
+ gluten-flink
+ 1.4.0-SNAPSHOT
+ ../pom.xml
+
+
+ gluten-flink-planner
+ Gluten Flink Planner
+ jar
+
+
+
+ org.apache.flink
+ flink-table-api-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-planner_${scala.binary.version}
+ ${flink.version}
+ provided
+
+
+ org.apache.gluten
+ gluten-flink-runtime
+ ${project.version}
+ provided
+
+
+ org.apache.flink
+ flink-clients
+ ${flink.version}
+ provided
+
+
+ io.github.zhztheplayer
+ velox4j
+ ${velox4j.version}
+ provided
+
+
+
+
+
+
+
+
diff --git a/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
new file mode 100644
index 000000000000..7b1939f57b1b
--- /dev/null
+++ b/gluten-flink/planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecCalc.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.nodes.exec.stream;
+
+import org.apache.gluten.rexnode.Utils;
+import org.apache.gluten.rexnode.LogicalTypeConverter;
+import org.apache.gluten.rexnode.RexNodeConverter;
+import org.apache.gluten.table.runtime.operators.GlutenCalOperator;
+
+import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.plan.FilterNode;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.plan.ProjectNode;
+import io.github.zhztheplayer.velox4j.plan.TableScanNode;
+import io.github.zhztheplayer.velox4j.serde.Serde;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.calcite.rex.RexNode;
+import org.apache.flink.FlinkVersion;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeMetadata;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
+import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
+import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
+import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.ExecNodeUtil;
+import org.apache.flink.table.planner.plan.nodes.exec.utils.TransformationMetadata;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Collections;
+import java.util.List;
+
+/** Gluten Stream {@link ExecNode} for Calc to use {@link GlutenCalOperator}. */
+@ExecNodeMetadata(
+ name = "stream-exec-calc",
+ version = 1,
+ producedTransformations = CommonExecCalc.CALC_TRANSFORMATION,
+ minPlanVersion = FlinkVersion.v1_15,
+ minStateVersion = FlinkVersion.v1_15)
+public class StreamExecCalc extends CommonExecCalc implements StreamExecNode {
+
+ public StreamExecCalc(
+ ReadableConfig tableConfig,
+ List projection,
+ @Nullable RexNode condition,
+ InputProperty inputProperty,
+ RowType outputType,
+ String description) {
+ this(
+ ExecNodeContext.newNodeId(),
+ ExecNodeContext.newContext(StreamExecCalc.class),
+ ExecNodeContext.newPersistedConfig(StreamExecCalc.class, tableConfig),
+ projection,
+ condition,
+ Collections.singletonList(inputProperty),
+ outputType,
+ description);
+ }
+
+ @JsonCreator
+ public StreamExecCalc(
+ @JsonProperty(FIELD_NAME_ID) int id,
+ @JsonProperty(FIELD_NAME_TYPE) ExecNodeContext context,
+ @JsonProperty(FIELD_NAME_CONFIGURATION) ReadableConfig persistedConfig,
+ @JsonProperty(FIELD_NAME_PROJECTION) List projection,
+ @JsonProperty(FIELD_NAME_CONDITION) @Nullable RexNode condition,
+ @JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List inputProperties,
+ @JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
+ @JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
+ super(
+ id,
+ context,
+ persistedConfig,
+ projection,
+ condition,
+ TableStreamOperator.class,
+ true, // retainHeader
+ inputProperties,
+ outputType,
+ description);
+ }
+
+ @Override
+ public Transformation translateToPlanInternal(
+ PlannerBase planner, ExecNodeConfig config) {
+ final ExecEdge inputEdge = getInputEdges().get(0);
+ final Transformation inputTransform =
+ (Transformation) inputEdge.translateToPlan(planner);
+
+ // add a mock input as velox not allow the source is empty.
+ // TODO: remove it.
+ Type inputType = LogicalTypeConverter.toVLType(inputEdge.getOutputType());
+ List inNames = Utils.getNamesFromRowType(inputEdge.getOutputType());
+ PlanNode mockInput = new TableScanNode(
+ String.valueOf(ExecNodeContext.newNodeId()),
+ inputType,
+ new ExternalStreamTableHandle("connector-external-stream"),
+ List.of());
+ PlanNode filter = new FilterNode(
+ String.valueOf(getId()),
+ List.of(mockInput),
+ RexNodeConverter.toTypedExpr(condition, inNames));
+ List projectExprs = RexNodeConverter.toTypedExpr(projection, inNames);
+ PlanNode project = new ProjectNode(
+ String.valueOf(ExecNodeContext.newNodeId()),
+ List.of(filter),
+ Utils.getNamesFromRowType(getOutputType()),
+ projectExprs);
+ // TODO: velo4j not support serializable now.
+ Utils.registerRegistry();
+ String plan = Serde.toJson(project);
+ String inputStr = Serde.toJson(inputType);
+ Type outputType = LogicalTypeConverter.toVLType(getOutputType());
+ String outputStr = Serde.toJson(outputType);
+ final GlutenCalOperator calOperator = new GlutenCalOperator(plan, mockInput.getId(), inputStr, outputStr);
+ return ExecNodeUtil.createOneInputTransformation(
+ inputTransform,
+ new TransformationMetadata("gluten-calc", "Gluten cal operator"),
+ calOperator,
+ InternalTypeInfo.of(getOutputType()),
+ inputTransform.getParallelism(),
+ false);
+ }
+}
diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
new file mode 100644
index 000000000000..77890461a7b9
--- /dev/null
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/FunctionMappings.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Mapping of flink function and velox function. */
+public class FunctionMappings {
+ // A map stores the relationship between flink function name and velox function.
+ private static Map functionMappings = new HashMap() {
+ {
+ // TODO: support more functions.
+ put(">", "greaterthan");
+ put("<", "lessthan");
+ }
+ };
+
+ public static String toVeloxFunction(String funcName) {
+ if (functionMappings.containsKey(funcName)) {
+ return functionMappings.get(funcName);
+ } else {
+ throw new RuntimeException("Function not supported: " + funcName);
+ }
+ }
+}
diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java
new file mode 100644
index 000000000000..27e5ab6102d6
--- /dev/null
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/LogicalTypeConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import io.github.zhztheplayer.velox4j.type.IntegerType;
+import io.github.zhztheplayer.velox4j.type.Type;
+import org.apache.flink.table.types.logical.BigIntType;
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.table.types.logical.VarCharType;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Convertor to convert Flink LogicalType to velox data Type */
+public class LogicalTypeConverter {
+
+ public static Type toVLType(LogicalType logicalType) {
+ if (logicalType instanceof RowType) {
+ RowType flinkRowType = (RowType) logicalType;
+ List fieldTypes = flinkRowType.getChildren().stream().
+ map(LogicalTypeConverter::toVLType).
+ collect(Collectors.toList());
+ return new io.github.zhztheplayer.velox4j.type.RowType(
+ flinkRowType.getFieldNames(),
+ fieldTypes);
+ } else if (logicalType instanceof IntType) {
+ return new IntegerType();
+ } else if (logicalType instanceof BigIntType) {
+ return new io.github.zhztheplayer.velox4j.type.BigIntType();
+ } else if (logicalType instanceof VarCharType) {
+ return new io.github.zhztheplayer.velox4j.type.VarCharType();
+ } else {
+ throw new RuntimeException("Unsupported logical type: " + logicalType);
+ }
+ }
+}
diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
new file mode 100644
index 000000000000..7c824bcb0327
--- /dev/null
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/RexNodeConverter.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import io.github.zhztheplayer.velox4j.expression.CallTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.ConstantTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
+import io.github.zhztheplayer.velox4j.expression.TypedExpr;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.BooleanType;
+import io.github.zhztheplayer.velox4j.type.IntegerType;
+import io.github.zhztheplayer.velox4j.type.Type;
+import io.github.zhztheplayer.velox4j.type.VarCharType;
+import io.github.zhztheplayer.velox4j.variant.BigIntValue;
+import io.github.zhztheplayer.velox4j.variant.BooleanValue;
+import io.github.zhztheplayer.velox4j.variant.DoubleValue;
+import io.github.zhztheplayer.velox4j.variant.IntegerValue;
+import io.github.zhztheplayer.velox4j.variant.SmallIntValue;
+import io.github.zhztheplayer.velox4j.variant.TinyIntValue;
+import io.github.zhztheplayer.velox4j.variant.VarBinaryValue;
+import io.github.zhztheplayer.velox4j.variant.VarCharValue;
+import io.github.zhztheplayer.velox4j.variant.Variant;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexInputRef;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Convertor to convert RexNode to velox TypedExpr */
+public class RexNodeConverter {
+
+ public static TypedExpr toTypedExpr(RexNode rexNode, List inNames) {
+ if (rexNode instanceof RexLiteral) {
+ RexLiteral literal = (RexLiteral) rexNode;
+ return new ConstantTypedExpr(
+ toType(literal.getType()),
+ toVariant(literal),
+ null);
+ } else if (rexNode instanceof RexCall) {
+ RexCall rexCall = (RexCall) rexNode;
+ List params = toTypedExpr(rexCall.getOperands(), inNames);
+ Type nodeType = toType(rexCall.getType());
+ return new CallTypedExpr(
+ nodeType,
+ params,
+ FunctionMappings.toVeloxFunction(rexCall.getOperator().getName()));
+ } else if (rexNode instanceof RexInputRef) {
+ RexInputRef inputRef = (RexInputRef) rexNode;
+ return FieldAccessTypedExpr.create(
+ toType(inputRef.getType()),
+ inNames.get(inputRef.getIndex()));
+ } else {
+ throw new RuntimeException("Unrecognized RexNode: " + rexNode);
+ }
+ }
+
+ public static List toTypedExpr(List rexNodes, List inNames) {
+ return rexNodes.stream()
+ .map(rexNode -> toTypedExpr(rexNode, inNames))
+ .collect(Collectors.toList());
+ }
+
+ public static Type toType(RelDataType relDataType) {
+ switch (relDataType.getSqlTypeName()) {
+ case BOOLEAN:
+ return new BooleanType();
+ case INTEGER:
+ return new IntegerType();
+ case BIGINT:
+ return new BigIntType();
+ case VARCHAR:
+ return new VarCharType();
+ default:
+ throw new RuntimeException("Unsupported type: " + relDataType.getSqlTypeName());
+ }
+ }
+
+ public static Variant toVariant(RexLiteral literal) {
+ switch (literal.getType().getSqlTypeName()) {
+ case BOOLEAN:
+ return new BooleanValue((boolean) literal.getValue());
+ case TINYINT:
+ return new TinyIntValue(Integer.valueOf(literal.getValue().toString()));
+ case SMALLINT:
+ return new SmallIntValue(Integer.valueOf(literal.getValue().toString()));
+ case INTEGER:
+ return new IntegerValue(Integer.valueOf(literal.getValue().toString()));
+ case BIGINT:
+ return new BigIntValue(Long.valueOf(literal.getValue().toString()));
+ case DOUBLE:
+ return new DoubleValue(Double.valueOf(literal.getValue().toString()));
+ case VARCHAR:
+ return new VarCharValue(literal.getValue().toString());
+ case BINARY:
+ return new VarBinaryValue(literal.getValue().toString());
+ default:
+ throw new RuntimeException(
+ "Unsupported rex node type: " + literal.getType().getSqlTypeName());
+ }
+ }
+
+}
diff --git a/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
new file mode 100644
index 000000000000..643013fb2862
--- /dev/null
+++ b/gluten-flink/planner/src/main/java/org/apache/gluten/rexnode/Utils.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.rexnode;
+
+import io.github.zhztheplayer.velox4j.serializable.ISerializableRegistry;
+import io.github.zhztheplayer.velox4j.variant.VariantRegistry;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.util.List;
+
+/** Utility to store some useful functions. */
+public class Utils {
+
+ private static boolean registryInitialized = false;
+ // Get names for project node.
+ public static List getNamesFromRowType(LogicalType logicalType) {
+ if (logicalType instanceof RowType) {
+ RowType rowType = (RowType) logicalType;
+ return rowType.getFieldNames();
+ } else {
+ throw new RuntimeException("Output type is not row type: " + logicalType);
+ }
+ }
+
+ // Init serialize related registries.
+ public static void registerRegistry() {
+ if (!registryInitialized) {
+ registryInitialized = true;
+ VariantRegistry.registerAll();
+ ISerializableRegistry.registerAll();
+ }
+ }
+}
diff --git a/gluten-flink/pom.xml b/gluten-flink/pom.xml
new file mode 100644
index 000000000000..124049897751
--- /dev/null
+++ b/gluten-flink/pom.xml
@@ -0,0 +1,41 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.gluten
+ gluten-parent
+ 1.4.0-SNAPSHOT
+ ../pom.xml
+
+
+ gluten-flink
+ Gluten Flink
+ pom
+
+
+ planner
+ loader
+ runtime
+
+
+
+ 1.20.0
+ 0.1.0-SNAPSHOT
+
+
diff --git a/gluten-flink/runtime/pom.xml b/gluten-flink/runtime/pom.xml
new file mode 100644
index 000000000000..1454e09a0904
--- /dev/null
+++ b/gluten-flink/runtime/pom.xml
@@ -0,0 +1,63 @@
+
+
+
+ 4.0.0
+
+
+ org.apache.gluten
+ gluten-flink
+ 1.4.0-SNAPSHOT
+ ../pom.xml
+
+
+ gluten-flink-runtime
+ Gluten Flink Runtime
+ jar
+
+
+
+ org.apache.flink
+ flink-streaming-java
+ ${flink.version}
+ provided
+
+
+ org.apache.flink
+ flink-table-runtime
+ ${flink.version}
+ provided
+
+
+ io.github.zhztheplayer
+ velox4j
+ ${velox4j.version}
+ provided
+
+
+ org.apache.arrow
+ arrow-memory-core
+ 18.1.0
+ compile
+
+
+
+
+
+
+
+
+
diff --git a/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
new file mode 100644
index 000000000000..101ec7737153
--- /dev/null
+++ b/gluten-flink/runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -0,0 +1,823 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JMXServerOptions;
+import org.apache.flink.configuration.RpcOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.plugin.PluginManager;
+import org.apache.flink.core.plugin.PluginUtils;
+import org.apache.flink.core.security.FlinkSecurityManager;
+import org.apache.flink.management.jmx.JMXService;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.blob.BlobCacheService;
+import org.apache.flink.runtime.blob.BlobUtils;
+import org.apache.flink.runtime.blob.TaskExecutorBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.entrypoint.ClusterEntrypointUtils;
+import org.apache.flink.runtime.entrypoint.DeterminismEnvelope;
+import org.apache.flink.runtime.entrypoint.FlinkParseException;
+import org.apache.flink.runtime.entrypoint.WorkingDirectory;
+import org.apache.flink.runtime.externalresource.ExternalResourceInfoProvider;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTrackerImpl;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalException;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.ReporterSetup;
+import org.apache.flink.runtime.metrics.TraceReporterSetup;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.util.MetricUtils;
+import org.apache.flink.runtime.rpc.AddressResolution;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcSystem;
+import org.apache.flink.runtime.rpc.RpcSystemUtils;
+import org.apache.flink.runtime.rpc.RpcUtils;
+import org.apache.flink.runtime.security.SecurityConfiguration;
+import org.apache.flink.runtime.security.SecurityUtils;
+import org.apache.flink.runtime.security.token.DelegationTokenReceiverRepository;
+import org.apache.flink.runtime.state.changelog.StateChangelogStorageLoader;
+import org.apache.flink.runtime.taskmanager.MemoryLogger;
+import org.apache.flink.runtime.util.ConfigurationParserUtils;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.util.Hardware;
+import org.apache.flink.runtime.util.JvmShutdownSafeguard;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
+import org.apache.flink.runtime.util.SignalHandler;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Reference;
+import org.apache.flink.util.ShutdownHookUtil;
+import org.apache.flink.util.StringUtils;
+import org.apache.flink.util.TaskManagerExceptionUtils;
+import org.apache.flink.util.concurrent.ExecutorThreadFactory;
+import org.apache.flink.util.concurrent.FutureUtils;
+import org.apache.flink.util.function.FunctionUtils;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetAddress;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is the executable entry point for the task manager in yarn or standalone mode. It
+ * constructs the related components (network, I/O manager, memory manager, RPC service, HA service)
+ * and starts them.
+ */
+public class TaskManagerRunner implements FatalErrorHandler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TaskManagerRunner.class);
+
+ private static final long FATAL_ERROR_SHUTDOWN_TIMEOUT_MS = 10000L;
+
+ private static final int SUCCESS_EXIT_CODE = 0;
+ @VisibleForTesting public static final int FAILURE_EXIT_CODE = 1;
+
+ private final Thread shutdownHook;
+
+ private final Object lock = new Object();
+
+ private final Configuration configuration;
+
+ private final Time timeout;
+
+ private final PluginManager pluginManager;
+
+ private final TaskExecutorServiceFactory taskExecutorServiceFactory;
+
+ private final CompletableFuture terminationFuture;
+
+ @GuardedBy("lock")
+ private DeterminismEnvelope resourceId;
+
+ /** Executor used to run future callbacks. */
+ @GuardedBy("lock")
+ private ExecutorService executor;
+
+ @GuardedBy("lock")
+ private RpcSystem rpcSystem;
+
+ @GuardedBy("lock")
+ private RpcService rpcService;
+
+ @GuardedBy("lock")
+ private HighAvailabilityServices highAvailabilityServices;
+
+ @GuardedBy("lock")
+ private MetricRegistryImpl metricRegistry;
+
+ @GuardedBy("lock")
+ private BlobCacheService blobCacheService;
+
+ @GuardedBy("lock")
+ private DeterminismEnvelope workingDirectory;
+
+ @GuardedBy("lock")
+ private TaskExecutorService taskExecutorService;
+
+ @GuardedBy("lock")
+ private boolean shutdown;
+
+ public TaskManagerRunner(
+ Configuration configuration,
+ PluginManager pluginManager,
+ TaskExecutorServiceFactory taskExecutorServiceFactory)
+ throws Exception {
+ this.configuration = checkNotNull(configuration);
+ this.pluginManager = checkNotNull(pluginManager);
+ this.taskExecutorServiceFactory = checkNotNull(taskExecutorServiceFactory);
+
+ timeout = Time.fromDuration(configuration.get(RpcOptions.ASK_TIMEOUT_DURATION));
+
+ this.terminationFuture = new CompletableFuture<>();
+ this.shutdown = false;
+
+ this.shutdownHook =
+ ShutdownHookUtil.addShutdownHook(
+ () -> this.closeAsync(Result.JVM_SHUTDOWN).join(),
+ getClass().getSimpleName(),
+ LOG);
+ }
+
+ private void startTaskManagerRunnerServices() throws Exception {
+ synchronized (lock) {
+ rpcSystem = RpcSystem.load(configuration);
+
+ this.executor =
+ Executors.newScheduledThreadPool(
+ Hardware.getNumberCPUCores(),
+ new ExecutorThreadFactory("taskmanager-future"));
+
+ highAvailabilityServices =
+ HighAvailabilityServicesUtils.createHighAvailabilityServices(
+ configuration,
+ executor,
+ AddressResolution.NO_ADDRESS_RESOLUTION,
+ rpcSystem,
+ this);
+
+ JMXService.startInstance(configuration.get(JMXServerOptions.JMX_SERVER_PORT));
+
+ rpcService = createRpcService(configuration, highAvailabilityServices, rpcSystem);
+
+ this.resourceId =
+ getTaskManagerResourceID(
+ configuration, rpcService.getAddress(), rpcService.getPort());
+
+ this.workingDirectory =
+ ClusterEntrypointUtils.createTaskManagerWorkingDirectory(
+ configuration, resourceId);
+
+ LOG.info("Using working directory: {}", workingDirectory);
+
+ HeartbeatServices heartbeatServices =
+ HeartbeatServices.fromConfiguration(configuration);
+
+ metricRegistry =
+ new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(
+ configuration,
+ rpcSystem.getMaximumMessageSizeInBytes(configuration)),
+ ReporterSetup.fromConfiguration(configuration, pluginManager),
+ TraceReporterSetup.fromConfiguration(configuration, pluginManager));
+
+ final RpcService metricQueryServiceRpcService =
+ MetricUtils.startRemoteMetricsRpcService(
+ configuration,
+ rpcService.getAddress(),
+ configuration.get(TaskManagerOptions.BIND_HOST),
+ rpcSystem);
+ metricRegistry.startQueryService(metricQueryServiceRpcService, resourceId.unwrap());
+
+ blobCacheService =
+ BlobUtils.createBlobCacheService(
+ configuration,
+ Reference.borrowed(workingDirectory.unwrap().getBlobStorageDirectory()),
+ highAvailabilityServices.createBlobStore(),
+ null);
+
+ final ExternalResourceInfoProvider externalResourceInfoProvider =
+ ExternalResourceUtils.createStaticExternalResourceInfoProviderFromConfig(
+ configuration, pluginManager);
+
+ final DelegationTokenReceiverRepository delegationTokenReceiverRepository =
+ new DelegationTokenReceiverRepository(configuration, pluginManager);
+
+ taskExecutorService =
+ taskExecutorServiceFactory.createTaskExecutor(
+ this.configuration,
+ this.resourceId.unwrap(),
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry,
+ blobCacheService,
+ false,
+ externalResourceInfoProvider,
+ workingDirectory.unwrap(),
+ this,
+ delegationTokenReceiverRepository);
+
+ handleUnexpectedTaskExecutorServiceTermination();
+
+ MemoryLogger.startIfConfigured(
+ LOG, configuration, terminationFuture.thenAccept(ignored -> {}));
+ }
+ }
+
+ @GuardedBy("lock")
+ private void handleUnexpectedTaskExecutorServiceTermination() {
+ taskExecutorService
+ .getTerminationFuture()
+ .whenComplete(
+ (unused, throwable) -> {
+ synchronized (lock) {
+ if (!shutdown) {
+ onFatalError(
+ new FlinkException(
+ "Unexpected termination of the TaskExecutor.",
+ throwable));
+ }
+ }
+ });
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Lifecycle management
+ // --------------------------------------------------------------------------------------------
+
+ public void start() throws Exception {
+ synchronized (lock) {
+ startTaskManagerRunnerServices();
+ taskExecutorService.start();
+ }
+ }
+
+ public void close() throws Exception {
+ try {
+ closeAsync().get();
+ } catch (ExecutionException e) {
+ ExceptionUtils.rethrowException(ExceptionUtils.stripExecutionException(e));
+ }
+ }
+
+ public CompletableFuture closeAsync() {
+ return closeAsync(Result.SUCCESS);
+ }
+
+ private CompletableFuture closeAsync(Result terminationResult) {
+ synchronized (lock) {
+ // remove shutdown hook to prevent resource leaks
+ ShutdownHookUtil.removeShutdownHook(shutdownHook, this.getClass().getSimpleName(), LOG);
+
+ if (shutdown) {
+ return terminationFuture;
+ }
+
+ final CompletableFuture taskManagerTerminationFuture;
+ if (taskExecutorService != null) {
+ taskManagerTerminationFuture = taskExecutorService.closeAsync();
+ } else {
+ taskManagerTerminationFuture = FutureUtils.completedVoidFuture();
+ }
+
+ final CompletableFuture serviceTerminationFuture =
+ FutureUtils.composeAfterwards(
+ taskManagerTerminationFuture, this::shutDownServices);
+
+ final CompletableFuture workingDirCleanupFuture =
+ FutureUtils.runAfterwards(
+ serviceTerminationFuture, () -> deleteWorkingDir(terminationResult));
+
+ final CompletableFuture rpcSystemClassLoaderCloseFuture;
+
+ if (rpcSystem != null) {
+ rpcSystemClassLoaderCloseFuture =
+ FutureUtils.runAfterwards(workingDirCleanupFuture, rpcSystem::close);
+ } else {
+ rpcSystemClassLoaderCloseFuture = FutureUtils.completedVoidFuture();
+ }
+
+ rpcSystemClassLoaderCloseFuture.whenComplete(
+ (Void ignored, Throwable throwable) -> {
+ if (throwable != null) {
+ terminationFuture.completeExceptionally(throwable);
+ } else {
+ terminationFuture.complete(terminationResult);
+ }
+ });
+
+ shutdown = true;
+ return terminationFuture;
+ }
+ }
+
+ private void deleteWorkingDir(Result terminationResult) throws IOException {
+ synchronized (lock) {
+ if (workingDirectory != null) {
+ if (!workingDirectory.isDeterministic() || terminationResult == Result.SUCCESS) {
+ workingDirectory.unwrap().delete();
+ }
+ }
+ }
+ }
+
+ private CompletableFuture shutDownServices() {
+ synchronized (lock) {
+ Collection> terminationFutures = new ArrayList<>(3);
+ Exception exception = null;
+
+ try {
+ JMXService.stopInstance();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+
+ if (blobCacheService != null) {
+ try {
+ blobCacheService.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ if (metricRegistry != null) {
+ try {
+ terminationFutures.add(metricRegistry.closeAsync());
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ if (highAvailabilityServices != null) {
+ try {
+ highAvailabilityServices.close();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e, exception);
+ }
+ }
+
+ if (rpcService != null) {
+ terminationFutures.add(rpcService.closeAsync());
+ }
+
+ if (executor != null) {
+ terminationFutures.add(
+ ExecutorUtils.nonBlockingShutdown(
+ timeout.toMilliseconds(), TimeUnit.MILLISECONDS, executor));
+ }
+
+ if (exception != null) {
+ terminationFutures.add(FutureUtils.completedExceptionally(exception));
+ }
+
+ return FutureUtils.completeAll(terminationFutures);
+ }
+ }
+
+ // export the termination future for caller to know it is terminated
+ public CompletableFuture getTerminationFuture() {
+ return terminationFuture;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // FatalErrorHandler methods
+ // --------------------------------------------------------------------------------------------
+
+ @Override
+ public void onFatalError(Throwable exception) {
+ TaskManagerExceptionUtils.tryEnrichTaskManagerError(exception);
+ LOG.error(
+ "Fatal error occurred while executing the TaskManager. Shutting it down...",
+ exception);
+
+ if (ExceptionUtils.isJvmFatalOrOutOfMemoryError(exception)) {
+ terminateJVM();
+ } else {
+ closeAsync(Result.FAILURE);
+
+ FutureUtils.orTimeout(
+ terminationFuture,
+ FATAL_ERROR_SHUTDOWN_TIMEOUT_MS,
+ TimeUnit.MILLISECONDS,
+ String.format(
+ "Waiting for TaskManager shutting down timed out after %s ms.",
+ FATAL_ERROR_SHUTDOWN_TIMEOUT_MS));
+ }
+ }
+
+ private void terminateJVM() {
+ FlinkSecurityManager.forceProcessExit(FAILURE_EXIT_CODE);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static entry point
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+ // startup checks and logging
+ EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args);
+ SignalHandler.register(LOG);
+ JvmShutdownSafeguard.installAsShutdownHook(LOG);
+
+ long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit();
+
+ if (maxOpenFileHandles != -1L) {
+ LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles);
+ } else {
+ LOG.info("Cannot determine the maximum number of open file descriptors");
+ }
+
+ runTaskManagerProcessSecurely(args);
+ }
+
+ public static Configuration loadConfiguration(String[] args) throws FlinkParseException {
+ return ConfigurationParserUtils.loadCommonConfiguration(
+ args, TaskManagerRunner.class.getSimpleName());
+ }
+
+ public static int runTaskManager(Configuration configuration, PluginManager pluginManager)
+ throws Exception {
+ final TaskManagerRunner taskManagerRunner;
+
+ try {
+ taskManagerRunner =
+ new TaskManagerRunner(
+ configuration,
+ pluginManager,
+ TaskManagerRunner::createTaskExecutorService);
+ taskManagerRunner.start();
+ } catch (Exception exception) {
+ throw new FlinkException("Failed to start the TaskManagerRunner.", exception);
+ }
+
+ try {
+ return taskManagerRunner.getTerminationFuture().get().getExitCode();
+ } catch (Throwable t) {
+ throw new FlinkException(
+ "Unexpected failure during runtime of TaskManagerRunner.",
+ ExceptionUtils.stripExecutionException(t));
+ }
+ }
+
+ public static void runTaskManagerProcessSecurely(String[] args) {
+ Configuration configuration = null;
+
+ try {
+ configuration = loadConfiguration(args);
+ } catch (FlinkParseException fpe) {
+ LOG.error("Could not load the configuration.", fpe);
+ System.exit(FAILURE_EXIT_CODE);
+ }
+
+ runTaskManagerProcessSecurely(checkNotNull(configuration));
+ }
+
+ public static void runTaskManagerProcessSecurely(Configuration configuration) {
+ FlinkSecurityManager.setFromConfiguration(configuration);
+ final PluginManager pluginManager =
+ PluginUtils.createPluginManagerFromRootFolder(configuration);
+ FileSystem.initialize(configuration, pluginManager);
+
+ StateChangelogStorageLoader.initialize(pluginManager);
+ Velox4j.initialize();
+
+ int exitCode;
+ Throwable throwable = null;
+
+ ClusterEntrypointUtils.configureUncaughtExceptionHandler(configuration);
+ try {
+ SecurityUtils.install(new SecurityConfiguration(configuration));
+
+ exitCode =
+ SecurityUtils.getInstalledContext()
+ .runSecured(() -> runTaskManager(configuration, pluginManager));
+ } catch (Throwable t) {
+ throwable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class);
+ exitCode = FAILURE_EXIT_CODE;
+ }
+
+ if (throwable != null) {
+ LOG.error("Terminating TaskManagerRunner with exit code {}.", exitCode, throwable);
+ } else {
+ LOG.info("Terminating TaskManagerRunner with exit code {}.", exitCode);
+ }
+
+ System.exit(exitCode);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Static utilities
+ // --------------------------------------------------------------------------------------------
+
+ public static TaskExecutorService createTaskExecutorService(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ BlobCacheService blobCacheService,
+ boolean localCommunicationOnly,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
+ WorkingDirectory workingDirectory,
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository delegationTokenReceiverRepository)
+ throws Exception {
+
+ final TaskExecutor taskExecutor =
+ startTaskManager(
+ configuration,
+ resourceID,
+ rpcService,
+ highAvailabilityServices,
+ heartbeatServices,
+ metricRegistry,
+ blobCacheService,
+ localCommunicationOnly,
+ externalResourceInfoProvider,
+ workingDirectory,
+ fatalErrorHandler,
+ delegationTokenReceiverRepository);
+
+ return TaskExecutorToServiceAdapter.createFor(taskExecutor);
+ }
+
+ public static TaskExecutor startTaskManager(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ TaskExecutorBlobService taskExecutorBlobService,
+ boolean localCommunicationOnly,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
+ WorkingDirectory workingDirectory,
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository delegationTokenReceiverRepository)
+ throws Exception {
+
+ checkNotNull(configuration);
+ checkNotNull(resourceID);
+ checkNotNull(rpcService);
+ checkNotNull(highAvailabilityServices);
+
+ LOG.info("Starting TaskManager with ResourceID: {}", resourceID.getStringWithMetadata());
+
+ SystemOutRedirectionUtils.redirectSystemOutAndError(configuration);
+
+ String externalAddress = rpcService.getAddress();
+
+ final TaskExecutorResourceSpec taskExecutorResourceSpec =
+ TaskExecutorResourceUtils.resourceSpecFromConfig(configuration);
+
+ TaskManagerServicesConfiguration taskManagerServicesConfiguration =
+ TaskManagerServicesConfiguration.fromConfiguration(
+ configuration,
+ resourceID,
+ externalAddress,
+ localCommunicationOnly,
+ taskExecutorResourceSpec,
+ workingDirectory);
+
+ Tuple2 taskManagerMetricGroup =
+ MetricUtils.instantiateTaskManagerMetricGroup(
+ metricRegistry,
+ externalAddress,
+ resourceID,
+ taskManagerServicesConfiguration.getSystemResourceMetricsProbingInterval());
+
+ final ExecutorService ioExecutor =
+ Executors.newFixedThreadPool(
+ taskManagerServicesConfiguration.getNumIoThreads(),
+ new ExecutorThreadFactory("flink-taskexecutor-io"));
+
+ TaskManagerServices taskManagerServices =
+ TaskManagerServices.fromConfiguration(
+ taskManagerServicesConfiguration,
+ taskExecutorBlobService.getPermanentBlobService(),
+ taskManagerMetricGroup.f1,
+ ioExecutor,
+ rpcService.getScheduledExecutor(),
+ fatalErrorHandler,
+ workingDirectory);
+
+ MetricUtils.instantiateFlinkMemoryMetricGroup(
+ taskManagerMetricGroup.f1,
+ taskManagerServices.getTaskSlotTable(),
+ taskManagerServices::getManagedMemorySize);
+
+ TaskManagerConfiguration taskManagerConfiguration =
+ TaskManagerConfiguration.fromConfiguration(
+ configuration,
+ taskExecutorResourceSpec,
+ externalAddress,
+ workingDirectory.getTmpDirectory());
+
+ String metricQueryServiceAddress = metricRegistry.getMetricQueryServiceGatewayRpcAddress();
+
+ return new TaskExecutor(
+ rpcService,
+ taskManagerConfiguration,
+ highAvailabilityServices,
+ taskManagerServices,
+ externalResourceInfoProvider,
+ heartbeatServices,
+ taskManagerMetricGroup.f0,
+ metricQueryServiceAddress,
+ taskExecutorBlobService,
+ fatalErrorHandler,
+ new TaskExecutorPartitionTrackerImpl(taskManagerServices.getShuffleEnvironment()),
+ delegationTokenReceiverRepository);
+ }
+
+ /**
+ * Create a RPC service for the task manager.
+ *
+ * @param configuration The configuration for the TaskManager.
+ * @param haServices to use for the task manager hostname retrieval
+ */
+ @VisibleForTesting
+ static RpcService createRpcService(
+ final Configuration configuration,
+ final HighAvailabilityServices haServices,
+ final RpcSystem rpcSystem)
+ throws Exception {
+
+ checkNotNull(configuration);
+ checkNotNull(haServices);
+
+ return RpcUtils.createRemoteRpcService(
+ rpcSystem,
+ configuration,
+ determineTaskManagerBindAddress(configuration, haServices, rpcSystem),
+ configuration.get(TaskManagerOptions.RPC_PORT),
+ configuration.get(TaskManagerOptions.BIND_HOST),
+ configuration.getOptional(TaskManagerOptions.RPC_BIND_PORT));
+ }
+
+ private static String determineTaskManagerBindAddress(
+ final Configuration configuration,
+ final HighAvailabilityServices haServices,
+ RpcSystemUtils rpcSystemUtils)
+ throws Exception {
+
+ final String configuredTaskManagerHostname = configuration.get(TaskManagerOptions.HOST);
+
+ if (configuredTaskManagerHostname != null) {
+ LOG.info(
+ "Using configured hostname/address for TaskManager: {}.",
+ configuredTaskManagerHostname);
+ return configuredTaskManagerHostname;
+ } else {
+ return determineTaskManagerBindAddressByConnectingToResourceManager(
+ configuration, haServices, rpcSystemUtils);
+ }
+ }
+
+ private static String determineTaskManagerBindAddressByConnectingToResourceManager(
+ final Configuration configuration,
+ final HighAvailabilityServices haServices,
+ RpcSystemUtils rpcSystemUtils)
+ throws LeaderRetrievalException {
+
+ final Duration lookupTimeout = configuration.get(RpcOptions.LOOKUP_TIMEOUT_DURATION);
+
+ final InetAddress taskManagerAddress =
+ LeaderRetrievalUtils.findConnectingAddress(
+ haServices.getResourceManagerLeaderRetriever(),
+ lookupTimeout,
+ rpcSystemUtils);
+
+ LOG.info(
+ "TaskManager will use hostname/address '{}' ({}) for communication.",
+ taskManagerAddress.getHostName(),
+ taskManagerAddress.getHostAddress());
+
+ HostBindPolicy bindPolicy =
+ HostBindPolicy.fromString(configuration.get(TaskManagerOptions.HOST_BIND_POLICY));
+ return bindPolicy == HostBindPolicy.IP
+ ? taskManagerAddress.getHostAddress()
+ : taskManagerAddress.getHostName();
+ }
+
+ @VisibleForTesting
+ static DeterminismEnvelope getTaskManagerResourceID(
+ Configuration config, String rpcAddress, int rpcPort) {
+
+ final String metadata =
+ config.get(TaskManagerOptionsInternal.TASK_MANAGER_RESOURCE_ID_METADATA, "");
+ return config.getOptional(TaskManagerOptions.TASK_MANAGER_RESOURCE_ID)
+ .map(
+ value ->
+ DeterminismEnvelope.deterministicValue(
+ new ResourceID(value, metadata)))
+ .orElseGet(
+ FunctionUtils.uncheckedSupplier(
+ () -> {
+ final String hostName =
+ InetAddress.getLocalHost().getHostName();
+ final String value =
+ StringUtils.isNullOrWhitespaceOnly(rpcAddress)
+ ? hostName
+ + "-"
+ + new AbstractID()
+ .toString()
+ .substring(0, 6)
+ : rpcAddress
+ + ":"
+ + rpcPort
+ + "-"
+ + new AbstractID()
+ .toString()
+ .substring(0, 6);
+ return DeterminismEnvelope.nondeterministicValue(
+ new ResourceID(value, metadata));
+ }));
+ }
+
+ /** Factory for {@link TaskExecutor}. */
+ public interface TaskExecutorServiceFactory {
+ TaskExecutorService createTaskExecutor(
+ Configuration configuration,
+ ResourceID resourceID,
+ RpcService rpcService,
+ HighAvailabilityServices highAvailabilityServices,
+ HeartbeatServices heartbeatServices,
+ MetricRegistry metricRegistry,
+ BlobCacheService blobCacheService,
+ boolean localCommunicationOnly,
+ ExternalResourceInfoProvider externalResourceInfoProvider,
+ WorkingDirectory workingDirectory,
+ FatalErrorHandler fatalErrorHandler,
+ DelegationTokenReceiverRepository delegationTokenReceiverRepository)
+ throws Exception;
+ }
+
+ public interface TaskExecutorService extends AutoCloseableAsync {
+ void start();
+
+ CompletableFuture getTerminationFuture();
+ }
+
+ public enum Result {
+ SUCCESS(SUCCESS_EXIT_CODE),
+ JVM_SHUTDOWN(FAILURE_EXIT_CODE),
+ FAILURE(FAILURE_EXIT_CODE);
+
+ private final int exitCode;
+
+ Result(int exitCode) {
+ this.exitCode = exitCode;
+ }
+
+ public int getExitCode() {
+ return exitCode;
+ }
+ }
+}
diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java
new file mode 100644
index 000000000000..609bd987cab2
--- /dev/null
+++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenCalOperator.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.table.runtime.operators;
+
+import io.github.zhztheplayer.velox4j.connector.ExternalStream;
+import io.github.zhztheplayer.velox4j.connector.ExternalStreamConnectorSplit;
+import io.github.zhztheplayer.velox4j.iterator.DownIterator;
+import io.github.zhztheplayer.velox4j.iterator.UpIterator;
+import io.github.zhztheplayer.velox4j.query.BoundSplit;
+import io.github.zhztheplayer.velox4j.serde.Serde;
+import io.github.zhztheplayer.velox4j.type.RowType;
+import org.apache.gluten.vectorized.VLVectorIterator;
+import org.apache.gluten.vectorized.FlinkRowToVLRowConvertor;
+
+import io.github.zhztheplayer.velox4j.Velox4j;
+import io.github.zhztheplayer.velox4j.config.Config;
+import io.github.zhztheplayer.velox4j.config.ConnectorConfig;
+import io.github.zhztheplayer.velox4j.memory.AllocationListener;
+import io.github.zhztheplayer.velox4j.memory.MemoryManager;
+import io.github.zhztheplayer.velox4j.plan.PlanNode;
+import io.github.zhztheplayer.velox4j.query.Query;
+import io.github.zhztheplayer.velox4j.session.Session;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+
+import java.util.List;
+
+/** Calculate operator in gluten, which will call Velox to run. */
+public class GlutenCalOperator extends TableStreamOperator
+ implements OneInputStreamOperator, GlutenOperator {
+
+ private final String glutenPlan;
+ private final String id;
+ private final String inputType;
+ private final String outputType;
+
+ private StreamRecord outElement = null;
+
+ private Session session;
+ private Query query;
+ private VLVectorIterator inputIterator;
+ BufferAllocator allocator;
+
+ public GlutenCalOperator(String plan, String id, String inputType, String outputType) {
+ this.glutenPlan = plan;
+ this.id = id;
+ this.inputType = inputType;
+ this.outputType = outputType;
+ }
+
+ @Override
+ public void open() throws Exception {
+ super.open();
+ outElement = new StreamRecord(null);
+ session = Velox4j.newSession(MemoryManager.create(AllocationListener.NOOP));
+
+ inputIterator = new VLVectorIterator();
+ ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator));
+ List splits = List.of(
+ new BoundSplit(
+ id,
+ -1,
+ new ExternalStreamConnectorSplit("connector-external-stream", es.id())));
+ PlanNode filter = Serde.fromJson(glutenPlan, PlanNode.class);
+ query = new Query(filter, splits, Config.empty(), ConnectorConfig.empty());
+ allocator = new RootAllocator(Long.MAX_VALUE);
+
+ }
+
+ @Override
+ public void processElement(StreamRecord element) {
+ inputIterator.addRow(
+ FlinkRowToVLRowConvertor.fromRowData(
+ element.getValue(),
+ allocator,
+ session,
+ Serde.fromJson(inputType, RowType.class)));
+ UpIterator result = session.queryOps().execute(query);
+ if (result.hasNext()) {
+ output.collect(
+ outElement.replace(
+ FlinkRowToVLRowConvertor.toRowData(
+ result.next(),
+ allocator,
+ session,
+ Serde.fromJson(outputType, RowType.class))));
+ }
+ }
+
+}
diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java
new file mode 100644
index 000000000000..9e8c6f012087
--- /dev/null
+++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/operators/GlutenOperator.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gluten.table.runtime.operators;
+
+/** Interface for all gluten operators. */
+public interface GlutenOperator {
+}
diff --git a/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java
new file mode 100644
index 000000000000..beb52433c00b
--- /dev/null
+++ b/gluten-flink/runtime/src/main/java/org/apache/gluten/vectorized/FlinkRowToVLRowConvertor.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gluten.vectorized;
+
+import io.github.zhztheplayer.velox4j.data.RowVector;
+import io.github.zhztheplayer.velox4j.session.Session;
+import io.github.zhztheplayer.velox4j.type.BigIntType;
+import io.github.zhztheplayer.velox4j.type.IntegerType;
+import io.github.zhztheplayer.velox4j.type.RowType;
+import io.github.zhztheplayer.velox4j.type.Type;
+import io.github.zhztheplayer.velox4j.type.VarCharType;
+import org.apache.arrow.memory.BufferAllocator;
+import org.apache.arrow.vector.BigIntVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.table.Table;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Converter between velox RowVector and Flink RowData. */
+public class FlinkRowToVLRowConvertor {
+
+ public static RowVector fromRowData(
+ RowData row,
+ BufferAllocator allocator,
+ Session session,
+ RowType rowType) {
+ // TODO: support more types
+ List arrowVectors = new ArrayList<>(rowType.size());
+ for (int i = 0; i < rowType.size(); i++) {
+ Type fieldType = rowType.getChildren().get(i);
+ if (fieldType instanceof IntegerType) {
+ IntVector intVector = new IntVector(rowType.getNames().get(i), allocator);
+ intVector.setSafe(0, row.getInt(i));
+ intVector.setValueCount(1);
+ arrowVectors.add(i, intVector);
+ } else if (fieldType instanceof BigIntType) {
+ BigIntVector bigIntVector = new BigIntVector(rowType.getNames().get(i), allocator);
+ bigIntVector.setSafe(0, row.getLong(i));
+ bigIntVector.setValueCount(1);
+ arrowVectors.add(i, bigIntVector);
+ } else if (fieldType instanceof VarCharType) {
+ VarCharVector stringVector = new VarCharVector(rowType.getNames().get(i), allocator);
+ stringVector.setSafe(0, row.getString(i).toBytes());
+ stringVector.setValueCount(1);
+ arrowVectors.add(i, stringVector);
+ } else {
+ throw new RuntimeException("Unsupported field type: " + fieldType);
+ }
+ }
+ return session.arrowOps().fromArrowTable(allocator, new Table(arrowVectors));
+ }
+
+ public static RowData toRowData(
+ RowVector rowVector,
+ BufferAllocator allocator,
+ Session session,
+ RowType rowType) {
+ // TODO: support more types
+ FieldVector fieldVector = session.arrowOps().toArrowVector(
+ allocator,
+ rowVector.loadedVector());
+ List