Stream API 详解

# 一. 函数式编程接口 Function / Consumer / Predicate / BiFunction / BinaryOperator

用到Stream API 有几个接口是必须会用到的,所有的mapfilterreduceforEach方法的参数 都是以上函数式接口的一个实现,尽管我们写Lambda表达式的时候并不会怎么关注到这个接口的本身,但是想要完全掌握Steam API 以及函数式编程,这些函数式编程接口是必须要了解的。同时,学习接口需要了解过JAVA 泛型相关知识。

以下TUR等类型字均用来表示泛型类型。

  • Function

    OptionalStreammap()方法接受一个Function类型的参数。`

    Function.apply()方法接受一个T类型的参数,并且返回一个R类型的计算结果,T类型为输入类型,R类型为输出类型。(转换)`

    Lambda表达式表达为 ((T) t) -> { /*do something*/ return (R) r; }

    public interface Function<T, R> {
        R apply(T t);
        // ...
    }
    
  • Consumer

    OptionalifPresent 和 Stream 的forEach 接受一个Consumer类型的参数

    Consumerapply()方法接受一个T类型参数,没有返回值。(消费)

    Lambda表达式表达为 ((T) t) -> { /*use t to do something*/ }

  • Predicate

    OptionalStreamfilter() 方法接受一个Predicate类型的参数`

    Predicate.apply()方法接受一个T类型参数,返回一个boolean值 (筛选)`

    Lambda表达式表达为 ((T) t) -> { /*use t to do something*/ return false; }

  • BiFunction

    BiFunction.apply()Function多一个U类型参数,可以处理比Function更复杂的操作,比如在Stream.reduce()Stream.collect()中有接受BiFunction类型的参数,处理复杂的汇聚操作。

    Lambda表达式表达为 ((T) t, (U) u) -> { /*use t to do something*/ return (R) r; }

    其中TUR`类型可以相同

  • BinaryOperator

    BinaryOperator 其实就是 TUR 类型全部相同的 BiFunction,实现特定的操作,比如汇聚列表。

相关的还有非常多函数式编程接口,了解到上面的,其他的都能容易理解。

# 二、Stream API 中的 reduce() 操作

reduce() 操作可以用来汇聚流。它有三个重载方法。

  • Optional<T> reduce(BinaryOperator<T> accumulator);

    该方法只有一个BinaryOperator参数,上文讲到BinaryOperator实现汇聚操作,将T类型流使用BinaryOperator操作后返回一个 Optional<T>结果。

    其中,第一个参数为上一次执行结果,第二个参数为本次操作元素,返回结果作为下一次执行的第一个参数

    // Example 实现`List<Integer>`累加,模拟`IntStream.sum()` 操作
    
    List<Integer> integerList = Arrays
        .asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        
    Integer res1 = integerList
        .stream()
        .reduce((l, r) -> l + r)
        .orElse(null); // result = 55
    
    
    Integer res2 = integerList
        .parallelStream()
        .reduce((l, r) -> l + r)
        .orElse(null); // result = 55
    
    // 该操作中使用 parallel 操作不会影响结果
    
  • T reduce(T identity, BinaryOperator<T> accumulator);

    该方法比第一个方法多了一个identity参数,亦即是初始值。

    其他同上

    // Example 实现`List<Integer>`累加,并且初始值为 -1
    
    List<Integer> integerList = Arrays
        .asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    
    //串行化:初始值-1, 然后累加
    Integer res1 = integerList
        .stream()
        .reduce((l, r) -> l + r)
        .orElse(null); // result = 54
    
    //并行化:初始值-1, 但是结果并行化的时候每一次-1都被加在结果中
    Integer res2 = integerList
        .parallelStream()
        .reduce((l, r) -> l + r)
        .orElse(null); // result = 45
    
    // 该操作中使用 parallel 操作会产生不一样的结果
    
  • <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner);

    该方法第三个的参数的加入,使得意义比前两者更加丰富,并且可以返回其他类型的汇聚结果U

    值得注意的是,该方法适合于并行汇聚,可以解决上个方法并行化中结果不一致的问题。更重要的是,当在串行化中使用该方法,第三个参数将会被忽略,用处等于上文第二个重载方法

    第一个参数为初始值,第二个参数为累加器,第三个参数为汇聚器 用具将并行累加的结果汇聚

    //三个参数 并行reduce 模拟distinct()
    List<Integer> integerList1 = Arrays
        .asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
    List<Integer> res4 = integerList1
        .parallelStream()
        .reduce(new ArrayList<>(),
            (l, r) -> {
                if (!l.contains(r))
                    l.add(r);
                    return l;
            }, // accumulator 并行操作
            (l, r) -> {
                    r.forEach(e -> {
                        if (!l.contains(e))
                            l.add(e);
                    });
                    return l;
            }); // combiner 并行汇总
    
    //result = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] 乱序
    

    因为并行操作,结果总是乱序的。

# 三、 Stream API中的 collect() 操作

collect() 方法与reduce() 有一点类似,不过它执行的操作不是汇聚(也可以在之中执行汇聚等其他操作),而是收集成容器。

collect() 有两个重载方法,其中一个接受一个Collector接口参数,以实现收集, 同时,Collectors类中也提供了非常多的常见收集实现,如toList()toCollectiontoMap()

本次主要讲难的自定义收集方法

<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);

  • 第一个参数Supplier顾名思义供给,该函数式接口中get()方法没有参数,用于提供一个对象,在collect()中用于表示提供初始容器的接口。
  • 第二个参数BiConsumer 与上文Consumer 类似的,没有返回值, 不过接受两个参数。在collect()方法中功能也非常不同。第一个参数为容器类型的实例,第二个参数为流类型,为该次执行参数,
  • 第三个参数BiConsumer 与第二个不同。第一个参数为容器类型,第二个参数也是容器类型,用于对并发收集结果进行处理和收集。
// Exemple 属性统计 + 收集结果
@Getter
@Setter
@ToString
@EqualsAndHashCode(exclude = {\\"number\\"})
// 以上 Lombok 自动生成代码插件注解(偷懒用...)
class Box {
    int id;
    int number;
    
    Box(int id, int number) {
        this.id = id;
        this.number = number;
    }
}

Box box1 = new Box(1,1);
Box box2 = new Box(1,2);
Box box3 = new Box(2,3);
Box box4 = new Box(2,4);
Box box5 = new Box(3,5);
Box box6 = new Box(3,6);
Box box7 = new Box(1,7);

List<Box> boxList = Arrays
    .asList(box1, box2, box3, box4, box5, box6, box7);
boxList.parallelStream()
    .collect(
    ArrayList::new,
        (l, b) -> {
            int idx = l.indexOf(b);
            if (-1 == idx)
                l.add(b);
            else
                l.get(idx).setNumber(l.get(idx).getNumber() + b.getNumber());
        },
        (ll, lr) -> lr.forEach(b -> {
            int idx = ll.indexOf(b);
            if (-1 == idx)
                ll.add(b);
            else
                ll.get(idx).setNumber(ll.get(idx).getNumber() + b.getNumber());
        })
    );

log.info(Arrays.toString(resList.toArray()));
//result = [Box(id=1, number=10), Box(id=2, number=7), Box(id=3, number=11)]

# 总结

Stream API 确实挺好用。。