Skip to content

Commit

Permalink
Update doc for reader-processor
Browse files Browse the repository at this point in the history
Signed-off-by: Taeik Lim <[email protected]>
  • Loading branch information
acktsap authored Feb 2, 2025
1 parent 0408b7e commit 8423609
Show file tree
Hide file tree
Showing 8 changed files with 1,219 additions and 16 deletions.
154 changes: 152 additions & 2 deletions doc/en/step/item-stream-flux-reader-processor-writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor)
- [Java](#java-1)
- [Kotlin](#kotlin-1)
- [Use a callback](#use-a-callback)
- [Create a tasklet without a writer](#create-a-tasklet-without-a-writer)
- [Java](#java-2)
- [Kotlin](#kotlin-2)
- [Use a callback](#use-a-callback)
- [Java](#java-3)
- [Kotlin](#kotlin-3)

Spring uses a reactive library called [Reactor](https://projectreactor.io/), which provides streams for various types of data using `Flux`. For `ItemReader` in Spring Batch to use the data read using `Flux`, you need to extract each item from `Flux` and return it.

Expand Down Expand Up @@ -326,9 +329,156 @@ open class TestJobConfig(
}
```

## Create a tasklet without a writer

If you need only `ItemStreamReader` and `ItemProcessor` without a writer, you can inherit `ItemStreamFluxReaderProcessor` to define `ItemStreamReader` and `ItemProcessor` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemProcessor`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamFluxReaderProcessor<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;

private int count = 0;

@NonNull
@Override
public Flux<? extends Integer> readFlux(@NonNull ExecutionContext executionContext) {
System.out.println("totalCount: " + totalCount);
return Flux.generate(sink -> {
if (count < totalCount) {
sink.next(count);
++count;
} else {
sink.complete();
}
});
}

@Override
public String process(@NonNull Integer item) {
return "'" + item.toString() + "'";
}
}
```

```java
@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.processor(AdapterFactory.itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(itemStreamReader(sampleTasklet))
.processor(itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

### Kotlin

In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemProcessor`.

```Kotlin
@Component
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long,
) : ItemStreamFluxReaderProcessor<Int, String> {
private var count = 0

override fun readFlux(executionContext: ExecutionContext): Flux<out Int> {
println("totalCount: $totalCount")
return Flux.generate { sink ->
if (count < totalCount) {
sink.next(count)
++count
} else {
sink.complete()
}
}
}

override fun process(item: Int): String? {
return "'$item'"
}
}
```

```Kotlin
@Configuration
open class TestJobConfig(
private val batch: BatchDsl,
private val transactionManager: PlatformTransactionManager,
) {
@Bean
open fun testJob(
sampleTasklet: SampleTasklet,
): Job = batch {
job("testJob") {
step("testStep") {
chunk<Int, String>(3, transactionManager) {
reader(sampleTasklet.asItemStreamReader())
processor(sampleTasklet.asItemProcessor())
writer { chunk -> println(chunk.items) }
}
}
}
}
}
```

## Use a callback

You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamFluxReaderProcessorWriter` and `ItemStreamFluxReaderWriter`. You can selectively define a callback method.
You can define a callback method for `ItemStream`. You can selectively define a callback method.

### Java

Expand Down
160 changes: 158 additions & 2 deletions doc/en/step/item-stream-iterable-reader-processor-writer.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
- [Create a tasklet without a processor](#create-a-tasklet-without-a-processor)
- [Java](#java-1)
- [Kotlin](#kotlin-1)
- [Use a callback](#use-a-callback)
- [Create a tasklet without a writer](#create-a-tasklet-without-a-writer)
- [Java](#java-2)
- [Kotlin](#kotlin-2)
- [Use a callback](#use-a-callback)
- [Java](#java-3)
- [Kotlin](#kotlin-3)

A chunk-oriented step in Spring Batch consists of `ItemReader`, `ItemProcessor`, and `ItemWriter`, which are usually defined separately and then assembled to define a step. However, there are some issues with this approach: it is difficult to share data between `ItemReader`, `ItemProcessor`, and `ItemWriter`, and you need to see each respective file to understand the batch flow. Also, if the classes are not reused, they can make the elements of a job less coherent.

Expand Down Expand Up @@ -328,9 +331,162 @@ open class TestJobConfig(
}
```

## Create a tasklet without a writer

If you need only `ItemStreamReader` and `ItemProcessor` without a writer, you can inherit `ItemStreamIterableReaderProcessor` to define `ItemStreamReader` and `ItemProcessor` in a single class.

### Java

In Java, you can convert a tasklet defined using `AdapterFactory` to `ItemStreamReader` and `ItemProcessor`.

```java
@Component
@StepScope
class SampleTasklet implements ItemStreamIterableReaderProcessor<Integer, String> {

@Value("#{jobParameters['totalCount']}")
private long totalCount;

private int count = 0;

@NonNull
@Override
public Iterable<? extends Integer> readIterable(@NonNull ExecutionContext executionContext) {
System.out.println("totalCount: " + totalCount);
return () -> new Iterator<>() {
@Override
public boolean hasNext() {
return count < totalCount;
}

@Override
public Integer next() {
return count++;
}
};
}

@Override
public String process(@NonNull Integer item) {
return "'" + item.toString() + "'";
}
}
```

```java
@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(AdapterFactory.itemStreamReader(sampleTasklet))
.processor(AdapterFactory.itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

You can statically import the method of `AdapterFactory` for better readability.

```java
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamReader;
import static com.navercorp.spring.batch.plus.step.AdapterFactory.itemStreamWriter;

...

@Configuration
public class TestJobConfig {

@Bean
public Job testJob(
SampleTasklet sampleTasklet,
JobRepository jobRepository,
PlatformTransactionManager transactionManager
) {
return new JobBuilder("testJob", jobRepository)
.start(
new StepBuilder("testStep", jobRepository)
.<Integer, String>chunk(3, transactionManager)
.reader(itemStreamReader(sampleTasklet))
.processor(itemProcessor(sampleTasklet))
.writer(chunk -> System.out.println(chunk.getItems()))
.build()
)
.build();
}
}
```

### Kotlin

In Kotlin, you can easily convert a tasklet defined using an extension function in Spring Batch Plus to `ItemStreamReader` and `ItemProcessor`.

```Kotlin
@Component
@StepScope
open class SampleTasklet(
@Value("#{jobParameters['totalCount']}") private var totalCount: Long,
) : ItemStreamIterableReaderProcessor<Int, String> {
private var count = 0

override fun readIterable(executionContext: ExecutionContext): Iterable<Int> {
println("totalCount: $totalCount")
return Iterable {
object : Iterator<Int> {
override fun hasNext(): Boolean {
return count < totalCount
}

override fun next(): Int {
return count++
}
}
}
}

override fun process(item: Int): String? {
return "'$item'"
}
}
```

```Kotlin
@Configuration
open class TestJobConfig(
private val batch: BatchDsl,
private val transactionManager: PlatformTransactionManager,
) {
@Bean
open fun testJob(
sampleTasklet: SampleTasklet,
): Job = batch {
job("testJob") {
step("testStep") {
chunk<Int, String>(3, transactionManager) {
reader(sampleTasklet.asItemStreamReader())
processor(sampleTasklet.asItemProcessor())
writer { chunk -> println(chunk.items) }
}
}
}
}
}
```

## Use a callback

You can define a callback method for `ItemStream` of `ItemStreamWriter` in `ItemStreamIterableReaderProcessorWriter` and `ItemStreamIterableReaderWriter`. You can selectively define a callback method.
You can define a callback method for `ItemStream`. You can selectively define a callback method.

### Java

Expand Down
Loading

0 comments on commit 8423609

Please sign in to comment.