Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make non-portable Splittable DoFn the only option when executing Java "Read" transforms #20530

Closed
damccorm opened this issue Jun 4, 2022 · 3 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

All runners seem to be capable of migrating to splittable DoFn for non-portable execution except for Dataflow runner v1 which will internalize the current primitive read implementation that is shared across runner implementations.

Imported from Jira BEAM-10670. Original Jira may contain additional context.
Reported by: lcwik.

@aditiwari01
Copy link

Hi @damccorm

I was trying KafkaIO with FlinkRunner but facing following issue:

Exception in thread "main" java.lang.IllegalStateException: No translator known for org.apache.beam.runners.core.construction.SplittableParDo$PrimitiveUnboundedRead
	at org.apache.beam.runners.core.construction.PTransformTranslation.urnForTransform(PTransformTranslation.java:283)
	at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:135)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
	at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
	at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
	at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
	at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
	at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:92)
	at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
	at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:105)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
	at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
	at BeamPipelineKafka.main(BeamPipelineKafka.java:54)

As you mentioned all the runners are capable of Splittable DoFn, is there anything I am missing?

I have also tried with "--experiments=use_deprecated_read" to use primitive read but still facing same issue.

@kennknowles
Copy link
Member

I don't think anyone is actively pursuing this goal at the moment. I think that the portable FlinkRunner is the one that has splittable DoFn support. They are pretty independent runners, I believe.

@github-actions github-actions bot added this to the 2.52.0 Release milestone Sep 26, 2023
@kennknowles
Copy link
Member

The current (bad) status is that all non-Dataflow runners will use legacy read if the runner is set up prior to expansion. This results in non-portable expansion behaviors.

The desired status would be that runners override the SDF read to legacy read if desired. The code to do this is already shipped with KafkaIO and used in the Dataflow runner, but it would be some real work, and probably throwaway work, to adjust other runners to use the override. More likely we just push everything to SDF.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants