You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Please do a quick search on Github issues first, there might be already a duplicate issue for the one you are about to create.
If the bug is trivial, just go ahead and create the issue. Otherwise, please take a few moments and fill in the following sections:
Bug description
When using RemotePartitioningManagerStepBuilder with @JobScope, messages from Kafka are received by the inputChannel but not forwarded to the IntegrationFlow defined in the RemotePartitioningManagerStepBuilder.build() method. This issue occurs because the IntegrationFlow registration timing in the build() method doesn't align with the delayed bean creation of @JobScope, resulting in messages not being processed through the configured flow (aggregator -> replies channel).
Environment
spring batch 5.1.2
spring boot 3.3.3
spring integration core(&kafka): 6.3.3
Please provide as many details as possible: Spring Batch version, Java version, which database you use if any, etc
Steps to reproduce
Configure typical remote partitioning with Kafka as middleware(or else)
Create a manager step using RemotePartitioningManagerStepBuilder
Expected behavior
Once messages(reply: StepExecution) from worker nodes are received by the inputChannel, they should be forwarded to the IntegrationFlow defined in RemotePartitioningManagerStepBuilder's build() method. This IntegrationFlow is responsible for aggregating the partition execution results and sending them to the replies channel.
Actual behavior
When using @JobScope step, messages from middleware(ex: Kafka) are received by the inputChannel but not forwarded to the IntegrationFlow defined in the build() method.
Minimal Complete Reproducible example
simple
@BeanpublicQueueChannelresponseChannel() {
returnnewQueueChannel();
}
@BeanpublicIntegrationFlowreceiveWorkerMessages(ConsumerFactory<?, ?> consumerFactory) {
returnIntegrationFlow
.from(Kafka.messageDrivenChannelAdapter(
consumerFactory,
"partition-responses")
)
.channel(responseChannel())
.get();
}
@Bean@JobScope// This causes the issue. But @JobScope is necessary here to receive job parameters.publicStepmanagerStep(
RemotePartitioningManagerStepBuilderFactorymanagerStepBuilderFactory) {
returnmanagerStepBuilderFactory.get("partitionManagerStep")
.partitioner("workerStep", newSimplePartitioner())
.gridSize(3)
.outputChannel(/* output channel config */)
.inputChannel(responseChannel())
.build();
}
The text was updated successfully, but these errors were encountered:
Please do a quick search on Github issues first, there might be already a duplicate issue for the one you are about to create.
If the bug is trivial, just go ahead and create the issue. Otherwise, please take a few moments and fill in the following sections:
Bug description
When using RemotePartitioningManagerStepBuilder with @JobScope, messages from Kafka are received by the inputChannel but not forwarded to the IntegrationFlow defined in the RemotePartitioningManagerStepBuilder.build() method. This issue occurs because the IntegrationFlow registration timing in the build() method doesn't align with the delayed bean creation of @JobScope, resulting in messages not being processed through the configured flow (aggregator -> replies channel).
Environment
spring batch 5.1.2
spring boot 3.3.3
spring integration core(&kafka): 6.3.3
Please provide as many details as possible: Spring Batch version, Java version, which database you use if any, etc
Steps to reproduce
Expected behavior
Once messages(reply: StepExecution) from worker nodes are received by the inputChannel, they should be forwarded to the IntegrationFlow defined in RemotePartitioningManagerStepBuilder's build() method. This IntegrationFlow is responsible for aggregating the partition execution results and sending them to the replies channel.
Actual behavior
When using @JobScope step, messages from middleware(ex: Kafka) are received by the inputChannel but not forwarded to the IntegrationFlow defined in the build() method.
Minimal Complete Reproducible example
simple
The text was updated successfully, but these errors were encountered: