Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RemotePartitioningManagerStepBuilder's IntegrationFlow not working with @JobScope #4702

Open
logan-28 opened this issue Nov 7, 2024 · 0 comments
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug

Comments

@logan-28
Copy link

logan-28 commented Nov 7, 2024

Please do a quick search on Github issues first, there might be already a duplicate issue for the one you are about to create.
If the bug is trivial, just go ahead and create the issue. Otherwise, please take a few moments and fill in the following sections:

Bug description
When using RemotePartitioningManagerStepBuilder with @JobScope, messages from Kafka are received by the inputChannel but not forwarded to the IntegrationFlow defined in the RemotePartitioningManagerStepBuilder.build() method. This issue occurs because the IntegrationFlow registration timing in the build() method doesn't align with the delayed bean creation of @JobScope, resulting in messages not being processed through the configured flow (aggregator -> replies channel).

Environment
spring batch 5.1.2
spring boot 3.3.3
spring integration core(&kafka): 6.3.3

Please provide as many details as possible: Spring Batch version, Java version, which database you use if any, etc

Steps to reproduce

  1. Configure typical remote partitioning with Kafka as middleware(or else)
  2. Create a manager step using RemotePartitioningManagerStepBuilder
  3. Apply @JobScope to the manager step bean
  4. Run workers & then run manager
  5. receive message from middleware

Expected behavior
Once messages(reply: StepExecution) from worker nodes are received by the inputChannel, they should be forwarded to the IntegrationFlow defined in RemotePartitioningManagerStepBuilder's build() method. This IntegrationFlow is responsible for aggregating the partition execution results and sending them to the replies channel.

Actual behavior
When using @JobScope step, messages from middleware(ex: Kafka) are received by the inputChannel but not forwarded to the IntegrationFlow defined in the build() method.

Minimal Complete Reproducible example
simple

@Bean
    public QueueChannel responseChannel() {
        return new QueueChannel();
    }
    
    @Bean
    public IntegrationFlow receiveWorkerMessages(ConsumerFactory<?, ?> consumerFactory) {
        return IntegrationFlow
            .from(Kafka.messageDrivenChannelAdapter(
                consumerFactory,
                "partition-responses")
            )
            .channel(responseChannel())
            .get();
    }
    
@Bean
   @JobScope  // This causes the issue. But @JobScope is necessary here to receive job parameters.
   public Step managerStep(
           RemotePartitioningManagerStepBuilderFactory managerStepBuilderFactory) {
       return managerStepBuilderFactory.get("partitionManagerStep")
           .partitioner("workerStep", new SimplePartitioner())
           .gridSize(3)
           .outputChannel(/* output channel config */)
           .inputChannel(responseChannel())
           .build();
   }
@logan-28 logan-28 added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Nov 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage Issues that we did not analyse yet type: bug
Projects
None yet
Development

No branches or pull requests

1 participant