Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] The POC of supporting Flink in Gluten #8839

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

shuai-xu
Copy link
Contributor

@shuai-xu shuai-xu commented Feb 27, 2025

What changes were proposed in this pull request?

This pr is the Java side of POC to support Flink. It generates a GlutenCalOperator to run filter using native.
The draft design is here.

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

@weiting-chen weiting-chen added the enhancement New feature or request label Feb 27, 2025
@weiting-chen weiting-chen changed the title The POC of supporting Flink in Gluten [VL] The POC of supporting Flink in Gluten Feb 27, 2025
@PHILO-HE
Copy link
Contributor

@shuai-xu, thanks for your great work! Could you draft a design doc? Google doc is preferred.

*/
package org.apache.gluten.backendsapi;

public class FlinkBackend {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth emphasizing that Flink will not be a backend in Gluten. It's more considered a frontend or a framework or so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, We can discuss it.

@jinchengchenghh
Copy link
Contributor

Can we use gluten-substrait module? Looks like there exists too much duplicated code.

@FelixYBW
Copy link
Contributor

Thank you for the PR! It eventually starts

@majetideepak
Copy link
Collaborator

majetideepak commented Feb 27, 2025

Is there a design for this support for people who are not very familiar with Flink?

@weiting-chen
Copy link
Contributor

Is there a design for this support for people who are not very familiar with Flink?

This is an initial PR and require more people to join with us for reviewing the design.
For the background and current potential issues, please check the link: https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing

@shuai-xu
Copy link
Contributor Author

Can we use gluten-substrait module? Looks like there exists too much duplicated code.

Yes, we need to discuss whether it need to translate to substrait plan or just call velox jni interface, if do need to translate to substrait plan, It need to reconstruct the gluten-substraint module a little to share it between gluten spark and flink.

@shuai-xu
Copy link
Contributor Author

@PHILO-HE @majetideepak OK, I will write a design soon.

@zhztheplayer
Copy link
Member

@PHILO-HE @majetideepak OK, I will write a design soon.

Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing?

I am opening a GitHub side discussion here: #8849

@weiting-chen
Copy link
Contributor

@PHILO-HE @majetideepak OK, I will write a design soon.

Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing?

I am opening a GitHub side discussion here: #8849

Yes, done to add the author.

}
}

private native long nativeProcessElement(int executor, long data);

This comment was marked as resolved.

@zjuwangg
Copy link
Contributor

zjuwangg commented Mar 3, 2025

Very glad to see such exciting progress on Flink support.
Looking forward on how to run the poc code in test...

@ParyshevSergey
Copy link

System.load("/home/xushuai/gluten/cpp-flink/build/libgflink.so");
How to get this library? Or which steps need to pass for compile it?

new BoundSplit(
"5",
-1,
new ExternalStreamConnectorSplit("escs1", es.id())));
Copy link
Member

@zhztheplayer zhztheplayer Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would use connector-external-stream as connector ID.

It is currently a fixed value in Velox4j; https://github.com/velox4j/velox4j/blob/434ae37dfc3d5fb79788fe5bce41e41dd17901b5/src/main/cpp/main/velox4j/init/Init.cc#L94-L97

ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator));
List<BoundSplit> splits = List.of(
new BoundSplit(
"5",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The planNodeId is the same with leaf scan node's node ID so Velox knows we bind this split to the scan. Perhaps we can pass the scan node ID into GlutenCalOperator somehow?

Comment on lines +51 to +54
return new ConstantTypedExpr(
toType(literal.getType()),
toVariant(literal),
null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use ConstantTypedExpr.create(toVariant(literal))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create will call native method, I think we'd better not call native in client side, so we need not load native libraries in Flink client side.

Comment on lines 109 to 114
// add a mock input as velox not allow the source is empty.
PlanNode mockInput = new ValuesNode(
String.valueOf(ExecNodeContext.newNodeId()),
"",
false,
1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a scan node so we can bind the split to it. E.g.,

final TableScanNode scanNode = new TableScanNode(
        "id-1",
        ...(type),
        new ExternalStreamTableHandle("connector-external-stream"),
        List.of()
    );

case VARCHAR:
return new VarCharValue(literal.getValue().toString());
case BINARY:
return new VarBinaryValue(literal.getValue().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use VarBinaryValue.create() to pass a byte array in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

att

@ParyshevSergey
Copy link

After try to run this POC, got error
Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

@github-actions github-actions bot added the DOCS label Mar 6, 2025
@shuai-xu
Copy link
Contributor Author

shuai-xu commented Mar 6, 2025

After try to run this POC, got error Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

You can try the latest code.

@ParyshevSergey
Copy link

ParyshevSergey commented Mar 6, 2025

After try to run this POC, got error Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

You can try the latest code.

Thanks for reply. I rerun with last commit and gotcha

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "sources" (class io.github.zhztheplayer.velox4j.plan.TableScanNode), not marked as ignorable (4 known properties: "id", "tableHandle", "outputType", "assignments"])
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.github.zhztheplayer.velox4j.plan.TableScanNode["sources"])

I looked at TableScanNode and notice that json getter exist on "sources" but this field is absence. I try rebuild velox4j with Serde change (add disable for jackson DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) and got

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: couldn't find key name in dynamic object
	at io.github.zhztheplayer.velox4j.jni.JniWrapper.executeQuery(Native Method)
	at io.github.zhztheplayer.velox4j.jni.JniApi.executeQuery(JniApi.java:52)
	at io.github.zhztheplayer.velox4j.query.Queries.execute(Queries.java:14)
	at org.apache.gluten.table.runtime.operators.GlutenCalOperator.processElement(GlutenCalOperator.java:97)

SQL query

CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DOCS enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

9 participants