-
Notifications
You must be signed in to change notification settings - Fork 13
3‐流的链式调用
链式调用是流的核心功能,它由一个流触发,返回一个新的流,中间可对数据进行映射、过滤、展平、开窗等等各种操作。
大部分链式调用主要集中于一元流,但也存在一元流、二元流、三元流互转的情况,放在后面讨论。
map
代表流的映射,是指元素一对一转换,由一个类型为T
的流转变为E
的流,通常在各个语言中都用map
来定义这类方法
// (1, 2, 3, 4) -> map(i -> i + 10) -> (11, 12, 13, 14)
default <E> Seq<E> map(Function<T, E> function)
带下标映射
// (1, 2, 3, 4) -> mapIndexed((index, i) -> index % 2 == 0 ? i : i + 10) -> (11, 2, 13, 4)
default <E> Seq<E> mapIndexed(IndexObjFunction<T, E> function)
仅对非空元素映射,相当于先过滤非空,再做映射
// (1, 2, null, 4) -> mapMaybe(i -> i + 10) -> (10, 20, 40)
default <E> Seq<E> mapMaybe(Function<T, E> function)
映射后过滤非空元素
// (1, 2, 3, 4) -> mapNotNull(i -> i % 2 == 0 ? null : i + 10) -> (10, 30)
default <E> Seq<E> mapNotNull(Function<T, E> function)
按照条件过滤元素
// (1, 2, 3, 4) -> filter(i -> i % 2 == 0) -> (2, 4)
default Seq<T> filter(Predicate<T> predicate)
带下标过滤
// (1, 2, 3, 4) -> filterIndexed((index, i) -> index % 2 == 0) -> (1, 3)
default Seq<T> filterIndexed(IndexObjPredicate<T> predicate)
过滤交集
// (1, 2, 3, 4) -> filterIn([3, 4, 5]) -> (3, 4)
default Seq<T> filterIn(Collection<T> collection)
// (1, 2, 3, 4) -> filterIn({3: "x", 4: "y", 5: "z"}) -> (3, 4)
default Seq<T> filterIn(Map<T, ?> map)
按类型过滤
// (dog, cat, rock, water) -> filterInstance(Animal.class) -> (dog, cat)
default <E> Seq<E> filterInstance(Class<E> cls)
按条件否过滤
default Seq<T> filterNot(Predicate<T> predicate)
default Seq<T> filterNotIn(Collection<T> collection)
default Seq<T> filterNotIn(Map<T, ?> map)
非空过滤
// (1, 2, null, 4) -> filterNotNull() -> (1, 2, 4)
default Seq<T> filterNotNull()
flatMap
即为流的展平,就是将流的每一个元素展开为一个新的流,并将他们合并到一起,通常用flatMap
命名。
如果你将Seq
也看作是一种Monad
(建议别这么看,Monad
玩法太少,会限制你的认知),那flatMap
即是它的bind
方法。
// (1, 2, 3, 4) -> flatMap(i -> repeat(2, i)) -> (1, 1, 2, 2, 3, 3, 4, 4)
default <E> Seq<E> flatMap(Function<T, Seq<E>> function)
展平流有一种特殊的场景,那是展平前后元素类型相同,即T
-> Seq
<T
>,相同的类型签名为其带来了可递归的特性,从而可以推导出树的展平,或者说流式遍历。
有些比较新的库会大量使用Optional
,那相应的对多个map
之后的Optional
,可以使用flatOptional
进行展平
// (1, 2, 3, 4) -> flatOptional(i -> Optional.of(i + 10)) -> (11, 12, 13, 14)
default <E> Seq<E> flatOptional(Function<T, Optional<E>> function)
提取前n项元素。在Java Stream中这个方法叫limit
,在Kotlin Sequence中这个方法叫take
,这也是更为标准和通用的叫法
default Seq<T> take(int n)
提取元素直到判定失败,后续的元素丢弃
default Seq<T> takeWhile(Predicate<T> predicate)
除此以外,还有一些衍生方法,例如相邻两两校验
// (1, 1, 1, 2) -> takeWhile(Integer::equals) -> (1, 1, 1)
default Seq<T> takeWhile(BiPredicate<T, T> testPrevCurr)
以及根据映射函数两两校验
// (1, 1, 1, 2, 3) -> takeWhile(i -> i / 2, Integer::equals) -> (1, 1, 1, 2)
default <E> Seq<T> takeWhile(Function<T, E> function, BiPredicate<E, E> testPrevCurr)
丢弃前n项元素,在Java Stream中这个方法叫skip
,在Kotlin Sequence中这个方法叫drop
,这也是更为标准和通用的叫法
default Seq<T> drop(int n)
丢弃元素直到判定成功,后续元素保留
default Seq<T> dropWhile(Predicate<T> predicate)
处理元素但不消费,在Java Stream中这个方法叫peek
,在Kotlin Sequence中这个方法叫onEach
,我们沿用Kotlin的称呼
// (1, 2, 3, 4) -> onEach(i -> print(i)) -> (1, 2, 3, 4) while print each
default Seq<T> onEach(Consumer<T> consumer)
带下标处理
// (1, 2, 3, 4) -> onEachIndexed((index, i) -> if (index % 2 == 0) print(i)) -> (1, 2, 3, 4) while print 2, 4
default Seq<T> onEachIndexed(IndexObjConsumer<T> consumer)
流本身是惰性加载的,无论套了多少层map
和filter
,实际执行时也是每个元素先做完所有的变换,被处理完成后,才会继续处理下一个元素。很多场景下,有必要用cache
对流进行缓存。
流的缓存有两个目的或者说价值,一是流本身的产生是高成本的,例如读一个文本文件,形成String
流,当缓存下来后就可以直接重复利用而不是再读一次文件。二是流通过了一次高成本或者难以复现的map
,例如做了一次IO,读了随机数,by时间进行了窗口聚合等等。为了确保下次使用时依然是原来的流,或者看重性能,都有必要进行缓存。
可以想到,最直观的将流进行的缓存的方案是把它放进一个List
里,或者更进一步,放进SeqList
里。然而,无论是ArrayList
还是LinkedList
,其实现方式在进行未知长度大批量数据连续add
的场景下,都存在一些缺陷,前者需要不断的拷贝和丢弃数组,后者每个元素本身开销较大。所以更好的方式是结合二者各自优点,缝合成一个"Linked-Array-List"。
简而言之,就是先像ArrayList
那样,每次数组填满后,产出一个长度为当前总size
1/2(且有一个上限)的一个新数组,并把它和上一个数组"link"起来,例如
// 假设初始数组size=4,第二个数组size=4/2=2,第三个数组size=(4+2)/2=3
(1, 2, 3, 4, 5, 6, 7, 8, 9) -> ([1, 2, 3, 4]=>[5, 6]=>[7, 8, 9])
由此,可以获得远好于List
的性能表现。
无论怎么排序,都需要先获取全部元素,将流聚合为一个ArraySeq
,而后再做操作,所以排序函数本质上也是聚合函数
自然顺序排序
default ArraySeq<T> sorted()
自然顺序逆排序
default ArraySeq<T> sortedDesc()
使用比较器排序
default ArraySeq<T> sortWith(Comparator<T> comparator)
使用比较器逆排序
default ArraySeq<T> sortWithDesc(Comparator<T> comparator)
按照值函数排序
default <E extends Comparable<E>> ArraySeq<T> sortBy(Function<T, E> function)
按照值函数逆排序
default <E extends Comparable<E>> ArraySeq<T> sortByDesc(Function<T, E> function)
有时在使用值函数进行排序时,值函数本身开销较大,每个元素比较大小的复杂度是O(n),故而最好进行缓存。
按照值函数缓存并排序
default <E extends Comparable<E>> Seq<T> sortCached(Function<T, E> function)
按照值函数缓存并逆排序
default <E extends Comparable<E>> Seq<T> sortCachedDesc(Function<T, E> function)
翻转流,顾名思义,将流的元素从后往前翻转一遍
// (1, 2, 3, 4) -> reverse() -> (4, 3, 2, 1)
default ArraySeq<T> reverse()
累加流,即从第一个元素开始,后续每个元素都是前面n项和自己的累加,称之为runningFold
// (1, 2, 3, 4) -> runningFold(0, Integer::sum) -> (1, 3, 6, 10)
default <E> Seq<E> runningFold(E init, BiFunction<E, T, E> function)
流的标准API之一,元素去重
// (1, 1, 2, 2, 3) -> distinct() -> (1, 2, 3)
default Seq<T> distinct()
指定值函数去重
// (1, 1, 2, 2, 3) -> distinctBy(i -> i / 2) -> (1, 3)
default <E> Seq<T> distinctBy(Function<T, E> function)
所有流的分段处理,都基于这个分段消费的方法实现,它使得着流的前n项元素和后面的元素可以使用不同的方式消费
default void consume(Consumer<T> consumer, int n, Consumer<T> substitute)
流的部分消费,只按照指定方式消费前n项,后面元素保留,称之为partial
// (1, 2, 3, 4) -> partial(2, System.out::println) -> (3, 4) while print 1, 2
default Seq<T> partial(int n, Consumer<T> substitute) {
return c -> consume(c, n, substitute);
}
map
可以指定前n项和后面的剩余项分别使用不同的方式映射,相当于一个随下标变化的函数。
典型的应用场景就是在读取CSV文件时需要对表头和数据采用不同的处理方式,又或者在将下划线转驼峰时,需要对第一个单词小写,并对后续的单词大写。
// (1, 2, 3, 4) -> map(i -> i + 10, 2, i -> i + 100) -> (101, 102, 13, 14)
default <E> Seq<E> map(Function<T, E> function, int n, Function<T, E> substitute) {
return c -> consume(t -> c.accept(function.apply(t)), n, t -> c.accept(substitute.apply(t)));
}
只在前n项里过滤
// (1, 2, 3, 4) -> filter(2, i -> i % 2 == 0) -> (2, 3, 4)
default Seq<T> filter(int n, Predicate<T> predicate)
只处理前n项
// (1, 2, 3, 4) -> onEach(2, System.out::println) -> (1, 2, 3, 4) while print 1, 2
default Seq<T> onEach(int n, Consumer<T> consumer) {
return c -> consume(c, n, consumer.andThen(c));
}
流的部分元素替换,将前n项元素替换为新的值,相当于一个特殊场景的分段map
// (1, 2, 3, 4) -> replace(2, i -> i + 10) -> (11, 12, 3, 4)
default Seq<T> replace(int n, UnaryOperator<T> operator)
所谓窗口函数就是对流的元素按照某种规则进行局部聚合,每一个小组聚合为整体后,构成一个新的流。聚合的逻辑通常有三种,按次数,按时间,按头尾元素特征。
窗口函数也是一种特殊的链式调用。
每n个元素分为一组
// (1, 2, 3, 4, 5, 6, 7, 8) -> chunked(3) -> ([1, 2, 3], [4, 5, 6], [7, 8])
default Seq<ArraySeq<T>> chunked(int size)
相邻若干元素分为一组。
连续满足条件,分为一组,不满足即中断
// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isEven) -> ([2, 2, 2], [4, 4])
default Seq<ArraySeq<T>> mapSub(Predicate<T> takeWhile)
指定分组的聚合方式为toSet
(默认为toList
)
// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isEven, toSet) -> ({2}, {4})
default <V> Seq<V> mapSub(Predicate<T> takeWhile, Reducer<T, V> reducer)
指定分组的开始条件与结束条件,如下示例为奇数开启,偶数结束
// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isOdd, isEven, toList) -> ([1, 1, 2], [3, 4])
default <V> Seq<V> mapSub(Predicate<T> first, Predicate<T> last, Reducer<T, V> reducer)
按数量滑动开窗,当size
和step
相等时,效果等价于chunked
// (1, 2, 3, 4) -> windowed(3, 1, true) -> ([1, 2, 3], [2, 3, 4], [3, 4], [4])
// (1, 2, 3, 4) -> windowed(3, 1, false) -> ([1, 2, 3], [2, 3, 4])
// (1, 2, 3, 4) -> windowed(3, 2, true) -> ([1, 2, 3], [3, 4])
// (1, 2, 3, 4) -> windowed(3, 2, false) -> ([1, 2, 3])
default Seq<ArraySeq<T>> windowed(int size, int step, boolean allowPartial)
按时间开窗,需要热流和异步流发布后才能完全发挥价值,效果与windowed
类似,只不过将数量窗替换为了时间窗。
指定时间窗、步长、聚合方式开窗
default <V> Seq<V> windowedByTime(long timeMillis, long stepMillis, Reducer<T, V> reducer)
指定时间窗、步长开窗,默认聚合方式为toList
default Seq<ArraySeq<T>> windowedByTime(long timeMillis, long stepMillis)
指定时间窗开窗,默认聚合方式为toList
,默认滚动模式
default Seq<ArraySeq<T>> windowedByTime(long timeMillis)