Skip to content

Commit

Permalink
Example run pass and add docs
Browse files Browse the repository at this point in the history
  • Loading branch information
shuai-xu committed Mar 6, 2025
1 parent 974b972 commit 4a15f3f
Show file tree
Hide file tree
Showing 10 changed files with 328 additions and 49 deletions.
97 changes: 97 additions & 0 deletions docs/get-started/Flink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
---
layout: page
title: Gluten For Flink with Velox Backend
nav_order: 1
parent: Getting-Started
---

# Supported Version

| Type | Version |
|-------|------------------------------|
| Flink | 1.20 |
| OS | Ubuntu20.04/22.04, Centos7/8 |
| jdk | openjdk11/jdk17 |
| scala | 2.12 |

# Prerequisite

Currently, with static build Gluten+Flink+Velox backend supports all the Linux OSes, but is only tested on **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8**. With dynamic build, Gluten+Velox backend support **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8** and their variants.

Currently, the officially supported Flink versions are 1.20.*.

We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 11** and **java 17**.

**For x86_64**

```bash
## make sure jdk8 is used
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
```

**For aarch64**

```bash
## make sure jdk8 is used
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64
export PATH=$JAVA_HOME/bin:$PATH
```

**Get gluten**

```bash
## config maven, like proxy in ~/.m2/settings.xml

## fetch gluten code
git clone https://github.com/apache/incubator-gluten.git
```

# Build Gluten Flink with Velox Backend

```
cd /path/to/gluten/gluten-flink
mvn clean package
```

## Dependency library deployment

Gluten for Flink depends on [Velox4j](https://github.com/velox4j/velox4j) to call velox. So you need to get the Velox4j packages and used them with gluten.
Velox4j jar available now is velox4j-0.1.0-SNAPSHOT.jar.

## Submit the Flink SQL job

Submit test script from `flink run`. You can use the `StreamSQLExample` as an example.

### Flink local cluster
```
var parquet_file_path = "/PATH/TO/TPCH_PARQUET_PATH"
var gluten_root = "/PATH/TO/GLUTEN"
```

After deploying flink binaries, please add gluten-flink jar to flink library path,
including gluten-flink-runtime-1.4.0.jar, gluten-flink-loader-1.4.0.jar and Velox4j jars above.
And make them loaded before flink libraries.
Then you can go to flink binary path and use the below scripts to
submit the example job.

```bash
bin/start-cluster.sh
bin/flink run -d -m 0.0.0.0:8080 \
-c org.apache.flink.table.examples.java.basics.StreamSQLExample \
lib/flink-examples-table_2.12-1.20.1.jar
```

Then you can get the result in `log/flink-*-taskexecutor-*.out`.
And you can see an operator named `gluten-cal` from the web frontend of your flink job.

### Flink Yarn per job mode

TODO

## Notes:
Now both Gluten for Flink and Velox4j have not a bundled jar including all jar depends on.
So you may have to add these jars by yourself, which may including guava-33.4.0-jre.jar, jackson-core-2.18.0.jar,
jackson-databind-2.18.0.jar, jackson-datatype-jdk8-2.18.0.jar, jackson-annotations-2.18.0.jar, arrow-memory-core-18.1.0.jar,
arrow-memory-unsafe-18.1.0.jar, arrow-vector-18.1.0.jar, flatbuffers-java-24.3.25.jar, arrow-format-18.1.0.jar, arrow-c-data-18.1.0.jar.
We will supply bundled jars soon.
2 changes: 2 additions & 0 deletions gluten-flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Gluten Flink Project
Gluten for Flink is under developing now, you can refer to [user guide](../docs/get-started/Flink.md) for a quick usage.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,19 @@

package org.apache.flink.table.planner.plan.nodes.exec.stream;

import io.github.zhztheplayer.velox4j.plan.ValuesNode;
import org.apache.gluten.rexnode.Utils;
import org.apache.gluten.rexnode.LogicalTypeConverter;
import org.apache.gluten.rexnode.RexNodeConverter;
import org.apache.gluten.table.runtime.operators.GlutenCalOperator;

import io.github.zhztheplayer.velox4j.connector.ExternalStreamTableHandle;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
import io.github.zhztheplayer.velox4j.plan.FilterNode;
import io.github.zhztheplayer.velox4j.plan.PlanNode;
import io.github.zhztheplayer.velox4j.plan.ProjectNode;
import io.github.zhztheplayer.velox4j.plan.TableScanNode;
import io.github.zhztheplayer.velox4j.serde.Serde;
import io.github.zhztheplayer.velox4j.type.Type;
import org.apache.calcite.rex.RexNode;
import org.apache.flink.FlinkVersion;
import org.apache.flink.api.dag.Transformation;
Expand Down Expand Up @@ -107,21 +113,31 @@ public Transformation<RowData> translateToPlanInternal(
(Transformation<RowData>) inputEdge.translateToPlan(planner);

// add a mock input as velox not allow the source is empty.
PlanNode mockInput = new ValuesNode(
// TODO: remove it.
Type inputType = LogicalTypeConverter.toVLType(inputEdge.getOutputType());
List<String> inNames = Utils.getNamesFromRowType(inputEdge.getOutputType());
PlanNode mockInput = new TableScanNode(
String.valueOf(ExecNodeContext.newNodeId()),
"",
false,
1);
inputType,
new ExternalStreamTableHandle("connector-external-stream"),
List.of());
PlanNode filter = new FilterNode(
String.valueOf(getId()),
List.of(mockInput),
RexNodeConverter.toTypedExpr(condition));
RexNodeConverter.toTypedExpr(condition, inNames));
List<TypedExpr> projectExprs = RexNodeConverter.toTypedExpr(projection, inNames);
PlanNode project = new ProjectNode(
String.valueOf(ExecNodeContext.newNodeId()),
List.of(filter),
List.of("0"), // TODO: add project names
RexNodeConverter.toTypedExpr(projection));
final GlutenCalOperator calOperator = new GlutenCalOperator(project);
Utils.getNamesFromRowType(getOutputType()),
projectExprs);
// TODO: velo4j not support serializable now.
Utils.registerRegistry();
String plan = Serde.toJson(project);
String inputStr = Serde.toJson(inputType);
Type outputType = LogicalTypeConverter.toVLType(getOutputType());
String outputStr = Serde.toJson(outputType);
final GlutenCalOperator calOperator = new GlutenCalOperator(plan, mockInput.getId(), inputStr, outputStr);
return ExecNodeUtil.createOneInputTransformation(
inputTransform,
new TransformationMetadata("gluten-calc", "Gluten cal operator"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@
import java.util.HashMap;
import java.util.Map;

/** Mapping of flink function and substrait function. */
/** Mapping of flink function and velox function. */
public class FunctionMappings {
// A map stores the relationship between flink function name and substrait function.
// A map stores the relationship between flink function name and velox function.
private static Map<String, String> functionMappings = new HashMap() {
{
put(">", "gt");
put("<", "lt");
// TODO: support more functions.
put(">", "greaterthan");
put("<", "lessthan");
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.rexnode;

import io.github.zhztheplayer.velox4j.type.IntegerType;
import io.github.zhztheplayer.velox4j.type.Type;
import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;

import java.util.List;
import java.util.stream.Collectors;

/** Convertor to convert Flink LogicalType to velox data Type */
public class LogicalTypeConverter {

public static Type toVLType(LogicalType logicalType) {
if (logicalType instanceof RowType) {
RowType flinkRowType = (RowType) logicalType;
List<Type> fieldTypes = flinkRowType.getChildren().stream().
map(LogicalTypeConverter::toVLType).
collect(Collectors.toList());
return new io.github.zhztheplayer.velox4j.type.RowType(
flinkRowType.getFieldNames(),
fieldTypes);
} else if (logicalType instanceof IntType) {
return new IntegerType();
} else if (logicalType instanceof BigIntType) {
return new io.github.zhztheplayer.velox4j.type.BigIntType();
} else if (logicalType instanceof VarCharType) {
return new io.github.zhztheplayer.velox4j.type.VarCharType();
} else {
throw new RuntimeException("Unsupported logical type: " + logicalType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,16 @@
import io.github.zhztheplayer.velox4j.expression.ConstantTypedExpr;
import io.github.zhztheplayer.velox4j.expression.FieldAccessTypedExpr;
import io.github.zhztheplayer.velox4j.expression.TypedExpr;
import io.github.zhztheplayer.velox4j.type.BigIntType;
import io.github.zhztheplayer.velox4j.type.BooleanType;
import io.github.zhztheplayer.velox4j.type.IntegerType;
import io.github.zhztheplayer.velox4j.type.Type;
import io.github.zhztheplayer.velox4j.type.VarCharType;
import io.github.zhztheplayer.velox4j.variant.BigIntValue;
import io.github.zhztheplayer.velox4j.variant.BooleanValue;
import io.github.zhztheplayer.velox4j.variant.DoubleValue;
import io.github.zhztheplayer.velox4j.variant.IntegerValue;
import io.github.zhztheplayer.velox4j.variant.SmallIntValue;
import io.github.zhztheplayer.velox4j.variant.TimestampValue;
import io.github.zhztheplayer.velox4j.variant.TinyIntValue;
import io.github.zhztheplayer.velox4j.variant.VarBinaryValue;
import io.github.zhztheplayer.velox4j.variant.VarCharValue;
Expand All @@ -45,7 +46,7 @@
/** Convertor to convert RexNode to velox TypedExpr */
public class RexNodeConverter {

public static TypedExpr toTypedExpr(RexNode rexNode) {
public static TypedExpr toTypedExpr(RexNode rexNode, List<String> inNames) {
if (rexNode instanceof RexLiteral) {
RexLiteral literal = (RexLiteral) rexNode;
return new ConstantTypedExpr(
Expand All @@ -54,7 +55,7 @@ public static TypedExpr toTypedExpr(RexNode rexNode) {
null);
} else if (rexNode instanceof RexCall) {
RexCall rexCall = (RexCall) rexNode;
List<TypedExpr> params = toTypedExpr(rexCall.getOperands());
List<TypedExpr> params = toTypedExpr(rexCall.getOperands(), inNames);
Type nodeType = toType(rexCall.getType());
return new CallTypedExpr(
nodeType,
Expand All @@ -64,15 +65,15 @@ public static TypedExpr toTypedExpr(RexNode rexNode) {
RexInputRef inputRef = (RexInputRef) rexNode;
return FieldAccessTypedExpr.create(
toType(inputRef.getType()),
String.valueOf(inputRef.getIndex()));
inNames.get(inputRef.getIndex()));
} else {
throw new RuntimeException("Unrecognized RexNode: " + rexNode);
}
}

public static List<TypedExpr> toTypedExpr(List<RexNode> rexNodes) {
public static List<TypedExpr> toTypedExpr(List<RexNode> rexNodes, List<String> inNames) {
return rexNodes.stream()
.map(rexNode -> toTypedExpr(rexNode))
.map(rexNode -> toTypedExpr(rexNode, inNames))
.collect(Collectors.toList());
}

Expand All @@ -82,6 +83,10 @@ public static Type toType(RelDataType relDataType) {
return new BooleanType();
case INTEGER:
return new IntegerType();
case BIGINT:
return new BigIntType();
case VARCHAR:
return new VarCharType();
default:
throw new RuntimeException("Unsupported type: " + relDataType.getSqlTypeName());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.rexnode;

import io.github.zhztheplayer.velox4j.serializable.ISerializableRegistry;
import io.github.zhztheplayer.velox4j.variant.VariantRegistry;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;

import java.util.List;

/** Utility to store some useful functions. */
public class Utils {

private static boolean registryInitialized = false;
// Get names for project node.
public static List<String> getNamesFromRowType(LogicalType logicalType) {
if (logicalType instanceof RowType) {
RowType rowType = (RowType) logicalType;
return rowType.getFieldNames();
} else {
throw new RuntimeException("Output type is not row type: " + logicalType);
}
}

// Init serialize related registries.
public static void registerRegistry() {
if (!registryInitialized) {
registryInitialized = true;
VariantRegistry.registerAll();
ISerializableRegistry.registerAll();
}
}
}
Loading

0 comments on commit 4a15f3f

Please sign in to comment.