forked from politrons/reactive
-
Notifications
You must be signed in to change notification settings - Fork 1
/
ObservableCompose.java
62 lines (49 loc) · 2.19 KB
/
ObservableCompose.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package rx.observables.transforming;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.schedulers.Schedulers;
/**
* Using compose we can get an observable and transform into another type or value.
* We pass to compose constantClass Transformer function that get the observable and change his type or value
*/
public class ObservableCompose {
private Scheduler mainThread = Schedulers.newThread();
Observable.Transformer<Integer, Integer> applySchedulers() {
return observable -> observable.subscribeOn(Schedulers.io())
.observeOn(mainThread);
}
Observable.Transformer<Integer, String> transformIntegerToString() {
return observable -> observable.map(String::valueOf);
}
/**
* In this example we apply in the observable that all actions before the compose are executed in another thread.
* But the result in the observer is processed in the main thread.
*/
@Test
public void observableWithScheduler() throws InterruptedException {
Observable.just(1)
.map(number -> {
System.out.println("Item processed on thread:" + Thread.currentThread()
.getName());
return number;
})
.compose(applySchedulers())
.subscribe(number -> System.out.println("Result in thread:" + Thread.currentThread()
.getName()));
Thread.sleep(1000);
}
/**
* In this example we use constantClass transformer to get the Integer item emitted and transform to String
*/
@Test
public void observableWithTransformToString() {
Observable.just(1)
.map(number -> {
System.out.println("Item is Integer:" + Integer.class.isInstance(number));
return number;
})
.compose(transformIntegerToString())
.subscribe(number -> System.out.println("Item is String:" + (String.class.isInstance(number))));
}
}