-
-
Notifications
You must be signed in to change notification settings - Fork 57
/
ObservableMerge.java
96 lines (79 loc) · 2.94 KB
/
ObservableMerge.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
package rx.observables.combining;
import org.junit.Test;
import rx.Observable;
import rx.Scheduler;
import rx.observables.transforming.Person;
import rx.schedulers.Schedulers;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
/**
* Merge get all observables defined in it, and set into an array,
* then iterate over the array emitting all observables through the pipeline.
*/
public class ObservableMerge {
/**
* Since we merge the two observables, once that we subscribe we will emit both.
* Shall print
* Person{name='pablo', age=34, sex='null'}
* Person{name='null', age=25, sex='male'}
*/
@Test
public void testMerge() {
Observable.merge(obPerson(), obPerson1())
.subscribe(result -> showResult(result.toString()));
}
/**
* Here we merge two list and we sort the list for every new item added into.
* Shall return
* <p>
* [1, 2, 3, 4, 5, 10, 11, 12, 13, 14, 15]
*/
@Test
public void testMergeLists() {
Observable.merge(Observable.from(Arrays.asList(2, 1, 13, 11, 5)), Observable.from(Arrays.asList(10, 4, 12, 3, 14, 15)))
.collect(ArrayList<Integer>::new, ArrayList::add)
.doOnNext(Collections::sort)
.subscribe(System.out::println);
}
/**
* Using mergeDelayError we ensure that all items merged are emitted, and if one of them throw an error it
* wont finish onComplete callback once all items are emitted but onError
*/
@Test
public void testMergeDelayError() {
Scheduler scheduler = Schedulers.newThread();
Observable.mergeDelayError(
Observable.error(new RuntimeException())
.observeOn(scheduler)
.subscribeOn(Schedulers.io()),
Observable.just("Hello")
.observeOn(scheduler)
.subscribeOn(Schedulers.io()))
.finallyDo(() -> System.out.println("Finally action"))
.subscribe(System.out::println,
System.out::println,
() -> System.out.println("On complete it should never happen"));
}
private void showResult(String s) {
System.out.println(s);
}
private Observable<Person> obPerson() {
return Observable.just(new Person("pablo", 34, null));
}
private Observable<Person> obPerson1() {
return Observable.just(new Person(null, 25, "male"));
}
@Test
public void testMergeMaxConcurrency() {
Observable.merge(Observable.just(
Observable.just(3),
Observable.just(5),
Observable.just(1),
Observable.just(4),
Observable.just(2)), 2)
.collect(ArrayList<Integer>::new, ArrayList::add)
.doOnNext(Collections::sort)
.subscribe(System.out::println);
}
}