Skip to content

3‐流的链式调用

wolray edited this page Oct 12, 2023 · 5 revisions

链式调用

链式调用是流的核心功能,它由一个流触发,返回一个新的流,中间可对数据进行映射、过滤、展平、开窗等等各种操作。

大部分链式调用主要集中于一元流,但也存在一元流、二元流、三元流互转的情况,放在后面讨论。

map

map代表流的映射,是指元素一对一转换,由一个类型为T的流转变为E的流,通常在各个语言中都用map来定义这类方法

// (1, 2, 3, 4) -> map(i -> i + 10) -> (11, 12, 13, 14)
default <E> Seq<E> map(Function<T, E> function)

mapIndexed

带下标映射

// (1, 2, 3, 4) -> mapIndexed((index, i) -> index % 2 == 0 ? i : i + 10) -> (11, 2, 13, 4)
default <E> Seq<E> mapIndexed(IndexObjFunction<T, E> function)

mapMaybe

仅对非空元素映射,相当于先过滤非空,再做映射

// (1, 2, null, 4) -> mapMaybe(i -> i + 10) -> (10, 20, 40)
default <E> Seq<E> mapMaybe(Function<T, E> function)

mapNotNull

映射后过滤非空元素

// (1, 2, 3, 4) -> mapNotNull(i -> i % 2 == 0 ? null : i + 10) -> (10, 30)
default <E> Seq<E> mapNotNull(Function<T, E> function)

filter

按照条件过滤元素

// (1, 2, 3, 4) -> filter(i -> i % 2 == 0) -> (2, 4)
default Seq<T> filter(Predicate<T> predicate)

filterIndexed

带下标过滤

// (1, 2, 3, 4) -> filterIndexed((index, i) -> index % 2 == 0) -> (1, 3)
default Seq<T> filterIndexed(IndexObjPredicate<T> predicate)

filterIn

过滤交集

取collection交集

// (1, 2, 3, 4) -> filterIn([3, 4, 5]) -> (3, 4)
default Seq<T> filterIn(Collection<T> collection)

取map交集

// (1, 2, 3, 4) -> filterIn({3: "x", 4: "y", 5: "z"}) -> (3, 4)
default Seq<T> filterIn(Map<T, ?> map)

filterInstance

按类型过滤

// (dog, cat, rock, water) -> filterInstance(Animal.class) -> (dog, cat)
default <E> Seq<E> filterInstance(Class<E> cls)

filterNot/filterNotIn

按条件否过滤

default Seq<T> filterNot(Predicate<T> predicate)
default Seq<T> filterNotIn(Collection<T> collection)
default Seq<T> filterNotIn(Map<T, ?> map)

filterNotNull

非空过滤

// (1, 2, null, 4) -> filterNotNull() -> (1, 2, 4)
default Seq<T> filterNotNull()

flatMap

flatMap即为流的展平,就是将流的每一个元素展开为一个新的流,并将他们合并到一起,通常用flatMap命名。

如果你将Seq也看作是一种Monad(建议别这么看,Monad玩法太少,会限制你的认知),那flatMap即是它的bind方法。

// (1, 2, 3, 4) -> flatMap(i -> repeat(2, i)) -> (1, 1, 2, 2, 3, 3, 4, 4)
default <E> Seq<E> flatMap(Function<T, Seq<E>> function)

展平流有一种特殊的场景,那是展平前后元素类型相同,即T -> Seq<T>,相同的类型签名为其带来了可递归的特性,从而可以推导出树的展平,或者说流式遍历。

flatOptional

有些比较新的库会大量使用Optional,那相应的对多个map之后的Optional,可以使用flatOptional进行展平

// (1, 2, 3, 4) -> flatOptional(i -> Optional.of(i + 10)) -> (11, 12, 13, 14)
default <E> Seq<E> flatOptional(Function<T, Optional<E>> function)

take

提取前n项元素。在Java Stream中这个方法叫limit,在Kotlin Sequence中这个方法叫take,这也是更为标准和通用的叫法

default Seq<T> take(int n)

takeWhile

提取元素直到判定失败,后续的元素丢弃

default Seq<T> takeWhile(Predicate<T> predicate)

除此以外,还有一些衍生方法,例如相邻两两校验

// (1, 1, 1, 2) -> takeWhile(Integer::equals) -> (1, 1, 1)
default Seq<T> takeWhile(BiPredicate<T, T> testPrevCurr)

以及根据映射函数两两校验

// (1, 1, 1, 2, 3) -> takeWhile(i -> i / 2, Integer::equals) -> (1, 1, 1, 2)
default <E> Seq<T> takeWhile(Function<T, E> function, BiPredicate<E, E> testPrevCurr)

drop

丢弃前n项元素,在Java Stream中这个方法叫skip,在Kotlin Sequence中这个方法叫drop,这也是更为标准和通用的叫法

default Seq<T> drop(int n)

dropWhile

丢弃元素直到判定成功,后续元素保留

default Seq<T> dropWhile(Predicate<T> predicate)

onEach

处理元素但不消费,在Java Stream中这个方法叫peek,在Kotlin Sequence中这个方法叫onEach,我们沿用Kotlin的称呼

// (1, 2, 3, 4) -> onEach(i -> print(i)) -> (1, 2, 3, 4) while print each
default Seq<T> onEach(Consumer<T> consumer)

onEachIndexed

带下标处理

// (1, 2, 3, 4) -> onEachIndexed((index, i) -> if (index % 2 == 0) print(i)) -> (1, 2, 3, 4) while print 2, 4
default Seq<T> onEachIndexed(IndexObjConsumer<T> consumer)

cache

流本身是惰性加载的,无论套了多少层mapfilter,实际执行时也是每个元素先做完所有的变换,被处理完成后,才会继续处理下一个元素。很多场景下,有必要用cache对流进行缓存。

流的缓存有两个目的或者说价值,一是流本身的产生是高成本的,例如读一个文本文件,形成String流,当缓存下来后就可以直接重复利用而不是再读一次文件。二是流通过了一次高成本或者难以复现的map,例如做了一次IO,读了随机数,by时间进行了窗口聚合等等。为了确保下次使用时依然是原来的流,或者看重性能,都有必要进行缓存。

可以想到,最直观的将流进行的缓存的方案是把它放进一个List里,或者更进一步,放进SeqList里。然而,无论是ArrayList还是LinkedList,其实现方式在进行未知长度大批量数据连续add的场景下,都存在一些缺陷,前者需要不断的拷贝和丢弃数组,后者每个元素本身开销较大。所以更好的方式是结合二者各自优点,缝合成一个"Linked-Array-List"。

简而言之,就是先像ArrayList那样,每次数组填满后,产出一个长度为当前总size1/2(且有一个上限)的一个新数组,并把它和上一个数组"link"起来,例如

// 假设初始数组size=4,第二个数组size=4/2=2,第三个数组size=(4+2)/2=3
(1, 2, 3, 4, 5, 6, 7, 8, 9) -> ([1, 2, 3, 4]=>[5, 6]=>[7, 8, 9])

由此,可以获得远好于List的性能表现。

流的排序

无论怎么排序,都需要先获取全部元素,将流聚合为一个ArraySeq,而后再做操作,所以排序函数本质上也是聚合函数

sorted/sortedDesc

自然顺序排序

default ArraySeq<T> sorted()

自然顺序逆排序

default ArraySeq<T> sortedDesc()

sortWith/sortWithDesc

使用比较器排序

default ArraySeq<T> sortWith(Comparator<T> comparator)

使用比较器逆排序

default ArraySeq<T> sortWithDesc(Comparator<T> comparator)

sortBy/sortByDesc

按照值函数排序

default <E extends Comparable<E>> ArraySeq<T> sortBy(Function<T, E> function)

按照值函数逆排序

default <E extends Comparable<E>> ArraySeq<T> sortByDesc(Function<T, E> function)

sortCached/sortCachedDesc

有时在使用值函数进行排序时,值函数本身开销较大,每个元素比较大小的复杂度是O(n),故而最好进行缓存。

按照值函数缓存并排序

default <E extends Comparable<E>> Seq<T> sortCached(Function<T, E> function)

按照值函数缓存并逆排序

default <E extends Comparable<E>> Seq<T> sortCachedDesc(Function<T, E> function)

reverse

翻转流,顾名思义,将流的元素从后往前翻转一遍

// (1, 2, 3, 4) -> reverse() -> (4, 3, 2, 1)
default ArraySeq<T> reverse()

runningFold

累加流,即从第一个元素开始,后续每个元素都是前面n项和自己的累加,称之为runningFold

// (1, 2, 3, 4) -> runningFold(0, Integer::sum) -> (1, 3, 6, 10)
default <E> Seq<E> runningFold(E init, BiFunction<E, T, E> function)

distinct

流的标准API之一,元素去重

// (1, 1, 2, 2, 3) -> distinct() -> (1, 2, 3)
default Seq<T> distinct()

distinctBy

指定值函数去重

// (1, 1, 2, 2, 3) -> distinctBy(i -> i / 2) -> (1, 3)
default <E> Seq<T> distinctBy(Function<T, E> function)

流的分段处理

consume

所有流的分段处理,都基于这个分段消费的方法实现,它使得着流的前n项元素和后面的元素可以使用不同的方式消费

default void consume(Consumer<T> consumer, int n, Consumer<T> substitute)

partial

流的部分消费,只按照指定方式消费前n项,后面元素保留,称之为partial

// (1, 2, 3, 4) -> partial(2, System.out::println) -> (3, 4) while print 1, 2
default Seq<T> partial(int n, Consumer<T> substitute) {
    return c -> consume(c, n, substitute);
}

map

map可以指定前n项和后面的剩余项分别使用不同的方式映射,相当于一个随下标变化的函数。

典型的应用场景就是在读取CSV文件时需要对表头和数据采用不同的处理方式,又或者在将下划线转驼峰时,需要对第一个单词小写,并对后续的单词大写。

// (1, 2, 3, 4) -> map(i -> i + 10, 2, i -> i + 100) -> (101, 102, 13, 14)
default <E> Seq<E> map(Function<T, E> function, int n, Function<T, E> substitute) {
    return c -> consume(t -> c.accept(function.apply(t)), n, t -> c.accept(substitute.apply(t)));
}

filter

只在前n项里过滤

// (1, 2, 3, 4) -> filter(2, i -> i % 2 == 0) -> (2, 3, 4)
default Seq<T> filter(int n, Predicate<T> predicate)

onEach

只处理前n项

// (1, 2, 3, 4) -> onEach(2, System.out::println) -> (1, 2, 3, 4) while print 1, 2
default Seq<T> onEach(int n, Consumer<T> consumer) {
    return c -> consume(c, n, consumer.andThen(c));
}

replace

流的部分元素替换,将前n项元素替换为新的值,相当于一个特殊场景的分段map

// (1, 2, 3, 4) -> replace(2, i -> i + 10) -> (11, 12, 3, 4)
default Seq<T> replace(int n, UnaryOperator<T> operator)

窗口函数

所谓窗口函数就是对流的元素按照某种规则进行局部聚合,每一个小组聚合为整体后,构成一个新的流。聚合的逻辑通常有三种,按次数,按时间,按头尾元素特征。

窗口函数也是一种特殊的链式调用。

chunked

每n个元素分为一组

// (1, 2, 3, 4, 5, 6, 7, 8) -> chunked(3) -> ([1, 2, 3], [4, 5, 6], [7, 8])
default Seq<ArraySeq<T>> chunked(int size)

mapSub

相邻若干元素分为一组。

连续满足条件,分为一组,不满足即中断

// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isEven) -> ([2, 2, 2], [4, 4])
default Seq<ArraySeq<T>> mapSub(Predicate<T> takeWhile)

指定分组的聚合方式为toSet(默认为toList

// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isEven, toSet) -> ({2}, {4})
default <V> Seq<V> mapSub(Predicate<T> takeWhile, Reducer<T, V> reducer)

指定分组的开始条件与结束条件,如下示例为奇数开启,偶数结束

// (1, 1, 2, 2, 2, 3, 4, 4, 5) -> mapSub(isOdd, isEven, toList) -> ([1, 1, 2], [3, 4])
default <V> Seq<V> mapSub(Predicate<T> first, Predicate<T> last, Reducer<T, V> reducer)

windowed

按数量滑动开窗,当sizestep相等时,效果等价于chunked

// (1, 2, 3, 4) -> windowed(3, 1, true)  -> ([1, 2, 3], [2, 3, 4], [3, 4], [4])
// (1, 2, 3, 4) -> windowed(3, 1, false) -> ([1, 2, 3], [2, 3, 4])
// (1, 2, 3, 4) -> windowed(3, 2, true)  -> ([1, 2, 3], [3, 4])
// (1, 2, 3, 4) -> windowed(3, 2, false) -> ([1, 2, 3])
default Seq<ArraySeq<T>> windowed(int size, int step, boolean allowPartial)

windowedByTime

按时间开窗,需要热流和异步流发布后才能完全发挥价值,效果与windowed类似,只不过将数量窗替换为了时间窗。

指定时间窗、步长、聚合方式开窗

default <V> Seq<V> windowedByTime(long timeMillis, long stepMillis, Reducer<T, V> reducer)

指定时间窗、步长开窗,默认聚合方式为toList

default Seq<ArraySeq<T>> windowedByTime(long timeMillis, long stepMillis)

指定时间窗开窗,默认聚合方式为toList,默认滚动模式

default Seq<ArraySeq<T>> windowedByTime(long timeMillis)