Java Stream Pipeline 流水线(一):Stream 基础

前言

本文是由笔者所原创的 《Java Stream Pipeline 流水线系列》文章之一,是继《深入 Java Lambda 系列》的后续系列文章;本文将会重点就 Java 流水线 Stream 的相关特性进行描述;

本文为作者原创作品,转载请注明出处;

概述

本文将重点提炼 State of the Lambda - Libraries Edition: http://cr.openjdk.java.net/~briangoetz/lambda/sotc3.html 中的官网内容,概要其核心内容;笔者并不会逐字逐句的翻译,而是将自己所理解的内容进行整理和总结。

特性

内部迭代器

使用内部迭代的方式是 Stream 既流式计算的一个设计理念,通过内部迭代器的方式,将流式计算的大量复杂的流程进行了封装;与内部迭代器( internal iteration )相对应的自然就是外部迭代器( external iteration ),外部迭代器的优势自然是给了用户极大的创作空间,但是相应的弊端也非常的明显,那就是太过于随意;实际上,笔者认为,在流式计算兴起以前,外部的或者是内部的,其实都无所谓,因为没有那么大量的数据,所以怎么迭代都好;但是,在流式计算兴起以后,特别是像具有互联网这种特性的数据处理过程,往往都是源源不断,永无止境的数据流入并需要处理,当流式计算在互联网中显得格外重要的时候,必然需要对流的处理的方式进行更好的封装便于方法和逻辑的重用,而流的处理是建立在流的迭代方式上的,自然就需要对流的迭代方式进行封装,也就自然需要一个所谓的“内部迭代器”;只有这样,我们才能够对流的处理进行封装,使得其功能和逻辑得以重用;一个典型的例子便是 Parallel Stream,通过“内部迭代器”使用 Fork and Join 架构实现并行处理,如果仍然是使用“外部迭代器”的方式自然就没有办法对 Fork and Join 架构进行统一的封装并对相应方法进行重构了。

最后来看一个“外部迭代器”和“内部迭代器”的例子,

“外部迭代器”

1
2
3
4
5
for (Shape s : shapes) {
if( s.getColor() == BLUE )
s.setColor(RED);
}
}

“内部迭代器”

1
2
3
shapes.stream()
.filter(s -> s.getColor() == BLUE)
.forEach(s -> { s.setColor(RED); });

可见,内部迭代器通过一些标准的接口比如 filter 和 forEach 将外部迭代器的外部循环逻辑给替代了,这样,一来便于用户只需要关注自己的核心部分代码实现,而来便于 Stream 在架构层面进行重构。

懒惰的

懒惰的

对于流式计算,可以采取两种方式来实现,一种是 Eagerly(殷切的),一种是 Lazniess(懒惰的);笔者使用这样一个例子来让大家迅速明白这两者这件的区别,假设我们有这样一个需求,有这样一组队列,

1
Arrays.asList("abc", "bdfafda", "ab", "abdace");

计算过程是,先过滤 filter,筛选出首字母为 “a” 的字符串,然后统计各个字符串的长度,最后返回最大的长度值;下面来看看 Eagerly 和 Lazniess 设计实现上的区别,

  • Eagerly
    首先,它会遍历数组中所有的元素,然后找到所有的满足首字母为 “a” 的元素,生成一个新的字符数组返回;
    然后,遍历上一步返回的所有结果,计算每一个字符串的 size,生成一个新的 int 数组返回;
    最后,从返回的 int 数组中筛选出最大值;
    这就是“殷切的”的最直观的体现,核心思想便是,当执行完这一步以后,才开始执行下一步。
  • Lazniess
    当遇到 filter,统计字符长度等操作的时候,它都不会开始计算;只有当遇到结束操作,比如统计最大值 max 方法后,才会开始计算;所以,它的中间计算过程既 filter 和 mapToInt 都是懒惰的,而只有结束操作 max 是 Eagerly(殷切的);比如,我们将上述的步骤用流表述为,

    1
    2
    3
    4
    strings.stream()
    .filter(s -> s.startsWith("a"))
    .mapToInt(String::length)
    .max();

    也就是说,只有当执行到 max() 方法以后,才会开始执行 filter()、mapToInt() 直到最后的结束操作 max();并且注意,整个执行过程中,并不是等 filter 执行完以后,才开始执行 mapToInt,等 mapToInt 执行完成以后,才开始执行 max,而是一个元素会执行完整个流程,比如说元素 “abc”,它会先执行 filter,满足需求,filter 直接将 “abc” 发送给 mapToInt 处理,mapToInt 计算得到长度为 3,然后将 3 直接发送给 max,等这一系列的动作完成以后,才会执行下一个元素 “bdfafda”。

所以这里可以总结得出,Java Stream 包含惰性方法和殷切的方法,只是要注意的是所有的结束操作都是都是殷切的。

短路操作

上一小节中提到了触发整个 Stream 开始执行的相关操作,当中提到了 max 方法;该方法是一个常规的结束方法,该结束方法将会触发 Stream 开始执行,这个过程也就是所谓的惰性;不过有一些特殊的结束方法,在 Stream 的执行过程中,还没有处理完所有元素的情况下,便可以终止流式计算,这种特殊的结束方法被称作为“Short-Circuiting”既是短路操作;常用的 findFirst、anyMatch 等等,比如

1
2
3
Optional<Shape> firstBlue = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.findFirst();

上面这个例子,只要找到第一个元素即可立即返回并直接终止当前的流计算。

side-effect

这里谈谈 side-effect 既是副作用,在官网中很多地方出现了这个词,其实很多网络博文对此理解不准确,这也难怪,因为官网对此概念一直是模模糊糊的;其实对 side-effect 正确的理解是,允许出现副作用的地方,因此在官方文档中有这么一个概念 side-effect-producing operations 就是说可以产生副作用的操作集,典型的就是 forEach(action) 方法,在这个方法中,允许产生副作用,比如修改源数据中的某些元素,或者自定义将元素保存到用户自定的队列中;看看下面这个用例,

1
2
3
4
5
6
7
8
9
10
11
List<Shape> list = new ArrayList<Shape>();

Collections.addAll(list, new Shape("RED"), new Shape("BLUE"), new Shape("BLUE"));

list.stream()
.filter(s -> s.getColor().equals("BLUE"))
.forEach( s -> {
s.setColor("GRAY");
});

list.forEach( s -> { System.out.println(s.getColor()); } );

逻辑很简单,就是将 BLUE Shape 改为 GRAY;输出结果,

1
2
3
RED
GRAY
GRAY

可见 forEach(action) 是允许发生副作用的地方;那么将这个例子稍微改动一下,我们便可以将结果保存到用户自定义的队列 storage 中;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
List<Shape> list = new ArrayList<Shape>();

Collections.addAll(list, new Shape("RED"), new Shape("BLUE"), new Shape("BLUE"));

List<Shape> storage = new ArrayList<Shape>();

list.stream()
.filter(s -> s.getColor().equals("BLUE"))
.forEach( s -> {
s.setColor("GRAY");
storage.add(s);
});

storage.forEach( s -> { System.out.println(s.getColor()); } );

输出结果,

1
2
GRAY
GRAY

Collectors

Collectors 故名思议,既是收集器

在前面的大量的用例中,我们已经看到通过使用方法 collect() 将流式处理的结果输出到一个 List 或者 Set 中;collect() 方法的参数是 Collector 接口对象,通过构造该接口对象解耦了 List、Set、Map 之间的异构性;Collectors 工具类为一些常用的 collectors 提供了构造工厂类,比如 Collectors.toList()、Collectors.toSet();前面的介绍中,我们已经看到了大量的有关 list 和 set 相关的用例,稍微复杂一点的是 toMap(),

  1. toMap()
    下面让我们来看一个 toMap 的例子,比如我们想要根据 catalog number 为所有的专辑创建一个索引,可以用如下的方式,

    1
    2
    3
    Map<Integer, Album> albumsByCatalogNumber =
    albums.stream()
    .collect(Collectors.toMap(a -> a.getCatalogNumber(), a -> a));

    这样,将会返回一个以 catalog number 为 Key,album 为 Value 的 Map;

  2. groupingBy()
    与 toMap 相关的是 groupingBy;比如我们有这样一个需求,就是让专辑中的每一首评分高于 4 分的歌按照其作者(artist)进行归类,我们可以用如下的方式,

    1
    2
    3
    4
    Map<Artist, List<Track>> favsByArtist =
    tracks.stream()
    .filter(t -> t.rating >= 4)
    .collect(Collectors.groupingBy(t -> t.artist));

    这样我们得到了一个 Map,其中的 Track(歌曲)按照其作者进行了分类存储;可以看到,上述默认将 Track 构造为了 List,但是往往 Track 会有重复的情况出现,如果是这样,我们可以使用如下的方式过滤掉重复的 Tracks,

    1
    2
    3
    4
    5
    Map<Artist, Set<Track>> favsByArtist =
    tracks.stream()
    .filter(t -> t.rating >= 4)
    .collect(Collectors.groupingBy(t -> t.artist,
    Collectors.toSet()));

    通过使用 toSet() 将结果保存到 Set 中,通过这样的方式将重复的 tracks 过滤掉;如果我们有这样一个更为复杂的需求,我们想要知道同一个 Artist 的所有 Tracks 同时对所有相同评分的 Tracks 进行分组,那么这个时候,我们可以通过多层的 groupingBy() 来构建一个双层的 Map 来实现,

    1
    2
    3
    4
    Map<Artist, Map<Integer, List<Track>>> byArtistAndRating =
    tracks.stream()
    .collect(groupingBy(t -> t.artist,
    groupingBy(t -> t.rating)));

    最后,来看一个更好玩的,统计歌曲名( Track Name )中所使用单词的频率,并用降序进行排列,

    1
    2
    3
    4
    5
    6
    Pattern pattern = Pattern.compile("\\s+");
    Map<String, Integer> wordFreq =
    tracks.stream()
    .flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String>
    .collect(groupingBy(s -> s.toUpperCase(),
    counting()));

    要注意 flatMap 的用法,它接受一个返回结果为 Stream R 的 lambda 表达式作为参数,

    1
    <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper);

    并且将 R 作为流传递给 downstream,参考下面的源码;
    ReferencePipeline.java

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    @Override
    public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    Objects.requireNonNull(mapper);
    // We can do better than this, by polling cancellationRequested when stream is infinite
    return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
    @Override
    Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    return new Sink.ChainedReference<P_OUT, R>(sink) {
    @Override
    public void begin(long size) {
    downstream.begin(-1);
    }

    @Override
    public void accept(P_OUT u) {
    try (Stream<? extends R> result = mapper.apply(u)) {
    // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
    if (result != null)
    result.sequential().forEach(downstream);
    }
    }
    };
    }
    };
    }

    注意上述代码第 20 行,逐个迭代 R Stream 中的元素并交给 downstream 既是下游 Sink 进行处理,由此可知,flatMap 的作用就是将输入 T 转换成另外一个 Stream R,然后进行流式计算,回到当前的这个例子中便是,将 Track Name 转换成一个一个的单词流,然后进行后续的处理,后续交给 groupBy() 进行处理,根据不同的单词进行分组并计算 count;

并行处理

并行处理

While the use of internal iteration makes it possible that operations be done in parallel, we do not wish to inflict any sort of “transparent parallelism” on the user. Instead, users should be able to select parallelism in an explicit but unobtrusive manner.

上述这段话描述了 Stream 并行处理的设计初衷,作者并不想让用户对并行处理是完全透明的,相反,用户可以手动的选择是否要使用并行处理;通过如下的方式,

1
2
3
4
int sum = shapes.parallel()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getWeight())
.sum();

这样,通过调用 Stream.parallel() 方法便会得到一个具有并发处理特性的 Stream 对象;上述代码中,shapes 表示的是一个 Stream 对象。

并行处理的底层原理

The steps involved in implementing parallel computations with Fork/Join are: dividing a problem into subproblems, solving the subproblems sequentially, and combining the results of subproblems. The Fork/Join machinery is designed to automate this process.

上面这段话描述了 Parllel Stream 的实现原理,将一个大的问题拆分成小的问题,按顺序的解决小的问题,然后依次将子任务的结果合并(备注,这正是 CountedCompleter 所能够保证的);新的 Fork/Join 机制被设计用来自动的实现该流程。原理性的东西不再照本宣科,不去逐字逐句的翻译官网上的内容;总体而言,如果要将大问题才分成小的问题,就必须利用一个新的工具,而这个工具正是 Spliterator 接口,有关该接口的详细内容笔者将在后续的博文中单独进行介绍,

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface Spliterator<T> {
// Element access
boolean tryAdvance(Consumer<? super T> action);
void forEachRemaining(Consumer<? super T> action);

// Decomposition
Spliterator<T> trySplit();

// Optional metadata
long estimateSize();
int characteristics();
Comparator<? super T> getComparator();
}
  • trySplit() : Spliterator<T>
    该方法既是将一个 Collection / Iterator 进行分解,生成更多小的 Collection / Iterator;这就是上面所描述的将一个大问题拆分成小的问题的具体实现方式。
  • estimateSize() : int
    获取当前 Spliterator 迭代器中元素的 size,当然,如果碰到源源不断流入的数据流,它的大小当然是不可知的。
  • tryAdvance(action)
    一次遍历一个元素;该接口功能将会在 Spliterator 博文中详细描述;
  • forEachRemaining(action)
    一次遍历所有元素;该接口功能将会在 Spliterator 博文中详细描述;
  • characteristics()
    该接口功能将会在 Spliterator 博文中详细描述;

Encounted Order

首先,笔者要问的是什么是 Encounted Order?先来看官网上的一段描述,

Many data sources, such as lists, arrays, and I/O channels, have a natural encounter order, which means the order in which the elements appear has significance. Others, such as HashSet, have no defined encounter order

上面的描述,说明了 Encounted Order 的一些性质,主要是说明 lists, arrays 和 I/O channels 都有自然的 Encounted Order,其它的,比如 HashSet 就没有 Encounted Order;但是,仅凭上面的这段官网描述,我们依然不知道 Encounter Order 到底是什么呢?不过可以大概的嗅到它的味道;笔者经过反复思考,慎重的给出了它的定义,如下,

Encoutned Order 就是一种所见即所得的顺序;什么意思呢?就是说,从队列中取出元素的顺序既是你存放该元素时候的顺序;比如,

1
2
3
collection.add("a");
collection.add("b");
collection.add("c");

如果使用 collection 迭代器依次迭代元素的话,输出的元素的顺序也就是 “a”, “b”, “c”;满足这种特性的迭代顺序,就称作 Encounted Order;

所以,这里要重点说明的是,Parallel Stream 同样满足这样的特性;比如,

1
2
3
List<String> names = people.parallelStream()
.map(Person::getName)
.collect(toList());

上面所输出的名字,即便是采用并发处理的方式也同样会严格按照 people 队列中 person 所出现的 Encounted Order 所进行排列;笔者将会在后续的 Parallel Stream 源码分析的博文中详细的介绍,其实这种特性是由继承自 CountedCompleter 的 java.util.stream.AbstractTask 中的并发执行逻辑所能保证的。

原始类型的 Streams

备注,官网的该章节的内容的介绍与 Java SE 8 SDK 的真实实现有差异,该差异笔者将会在本章节结束的部分进行描述,由此可见,该部分内容应该仅仅是对于 Primitive streams 实现之前的构思

我们期望通过如下的样例代码块对原始类型( int )进行流式计算,

1
2
3
4
List<String> strings = ...
int sumOfLengths = strings.stream()
.map(String::length)
.reduce(0, Integer::plus);

通过这样的方式,能够不必考虑原始类型与引用类型之间的差异(备注,通过将原始类型自动封装成为引用类型后,这样便可以统一的使用引用类型来进行处理);但是这样做,却会引来性能上的问题,那就是频繁的在原始类型和引用类型之间进行封装和解封装,而这个性能的损耗在对性能要求极为苛刻的流式计算中是不能被接受的;所以这个看似完美的解决方案,其实并不可取。

所以留给我们的只有如下的两种解决方式,

  1. 通过使用 IntMapper 作为参数返回一个 IntStream 对象的方式对 map 方法进行重载,由此为 int stream 提供一种独立的抽象。
  2. 通过提供一系列相容的方法(Fuse common pairs of operations),比如提供能够接受 IntMapper + IntOperator 作为参数 mapreduce 方法。

这里作者认为上述的方案 #2 并不成熟,甚至于不能作为一个完整的解决方案;下面是作者针对方案 #1 的一些解决办法的描述,

1
2
3
4
List<String> strings = ...
int sumOfLengths = strings.stream()
.map(String::length)
.sum();

使用上述的方式就需要能够使得 map(Function<T,U>) 能够被 map(IntMapper<T>) 的方式所重载,然后提供一些原始类型所特有的 Stream 对象,诸如 IntStream、DoubleStream 等等;所以,Stream 的接口类似下面这样,

1
2
3
4
5
6
7
interface Stream<T> {
...
<U> Stream<U> map(Mapper<T,U>);
IntStream map(IntMapper<T>);
LongStream map(LongMapper<T>);
DoubleStream map(DoubleMapper<T>);
}

所以,从官网的这段描述中可以看到,作者的初衷是想使用方案 #1 通过对 map 方法进行重载的方式更为动态的实现 Primitive Stream;但是正如笔者在该章节开始所描述的那样,JDK 8 并没有按照 #1 的方案实施,而是使用的方案 #2,既是使用 fused-operation approach;从 Stream 的接口最终实现就可以看出,

Stream<T>.java

1
2
3
4
<R> Stream<R> map(Function<? super T, ? extends R> mapper);
IntStream mapToInt(ToIntFunction<? super T> mapper);
LongStream mapToLong(ToLongFunction<? super T> mapper);
DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper);

可见分别提供了对各种原始数字类型的 fused-operations 来处理原始类型的流式计算;所以,上述样例代码最终的实现方式如下所述,

1
2
3
4
List<String> strings = ...
int sumOfLengths = strings.stream()
.mapToInt(String::length)
.sum();

注入代码第 3 行,使用的是方法 mapToInt 而非重载过后的 map;笔者猜想,之所以方案 #1 最终并没有落地,估计是方案 #1 中有一个比较难以实现的前提,那就是需要使得 map(Function<T,U>) 方法能够被 map(IntMapper<T>) 所重载,而这种方式是 Java 的重载默认不提供的,因为毕竟参数类型有极大的不同,如果要强制实现,那么这里必然针对这个特例对 JVM 底层源码进行修改,且难度可想而知。

其它特性

Nulls

有些 Collections 中的元素允许 null 值,但有些不允许;我们有三种不同的处理 nulls 方式供 Stream 来实现,

  1. Error
    如果发现 Stream 中有元素为 null 值,则抛出异常;这种方式实际上就是不允许 nulls。
  2. Ignore

    Ignore nulls in streams.

    这里直译为直接忽略掉;但是其实真正的含义是,跳过 nulls,不处理它。

  3. Agnostic
    意思是,Stream 本身不对 nulls 做任何的处理;如何处理交由用户自己决定。

Stream 的设计者采用第三种方案;交由用户自己去如何处理 Nulls;比如,如果不允许 Nulls 发生,可以使用如下代码,

1
filter(e -> { (if e == null) throw new NPE(); } )

如果,想直接 Ingore,使用

1
filter(e -> e != null)

Non-interference

Because the stream source might be a mutable collection, there is the possibility for interference if the source is modified while it is being traversed. The stream operations are intended to be used while the underlying source is held constant for the duration of the operation.

上面描述了因为 Collection 中的元素往往是引用对象,因此该元素是允许被修改的(即便是在 Collection 对象上加上 final 限制);而 Stream 的相关操作是建立在将 collection 中的元素视为常量的基础之上的,所以 Stream 操作的限制就是不允许对源数组中的数据进行修改;因此引用对象本身可以被修改的这个特性与此相悖。

笔者补充,其实这个限制并不新鲜,Java 的 Iteratble 接口早就有这种限制了,比如 AbstractList 中的属性 modCount 那样,记录了当前 list 中的修改次数,然后通过使用 变量记录迭代器中 list 的修改次数,如果在 list 的迭代过程中,发现这两个值不相等,既 modCount ≠ expectedModCount,那么将会抛出 ConcurrentModificationException 异常,表示在迭代过程中,源 list 中的数据被更改过了,这时强制不允许的。同样的,这个限制也适用于流式计算既 Stream,从后续的内容中可以知道,Stream 的迭代器是 Spliterator,也就是说该限制最终落地到的是 Spliterator 对象上的。

无限的流

Unlike a Collection, there is nothing about a stream that requires it to be of finite size. While certain operations on an infinite stream (such as forEach) would never terminate under normal conditions, there are many operations that can deal perfectly well with infinite streams (e.g., limit will truncate a stream; findFirst or findAny will terminate as soon as they find a match, etc.) Similarly, an infinite stream can be turned into an Iterator and iterated directly.

上述描述中,最重要的一段话是最后一句,既是可以将 Infinite Stream 转换成 Iterator 来进行处理。后续中我们可以看到,在处理 I/O 输入流的过程中,正式通过 I/O 的输入流转换成 Iterator 的方式来实现这种无限流的处理的,具体用例可以参考 BufferedReader.lines() : Stream 方法。

Streams and Lambda in JDK

创建 Stream 的种种方式

  1. Collection 接口提供了 stream() 和 parallelStream() 来获取 Stream 对象;
  2. Arrays 工具类提供了 stream() 方法来为数组对象获取 Stream 对象;
  3. 可以使用如下 Stream 接口中的静态工厂类来创建 Stream 对象,
    Stream.of
    Stream.generate
    IntStream.range
    String.chars
  4. 可以通过某些特殊类的静态方法获得 Stream 对象,
    String.chars
    BufferedReader.lines
    Pattern.splitAsStream
    Random.ints
    BitSet.stream

Comparator 工厂

这部分主要描述的是 lambda 相关的内容,

Comparator.comparing()

1
2
3
4
5
public static <T, U extends Comparable<? super U>> Comparator<T> comparing(
Function<? super T, ? extends U> keyExtractor) {
return (c1, c2)
-> keyExtractor.apply(c1).compareTo(keyExtractor.apply(c2));
}

该静态方法接收一个 Function 对象既 keyExtractor,通过 keyExtractor 进行 T -> U 映射,也就是说,最终比较的是 $U_1$ 和 $U_2$;方法内部很有意思,通过 lambda 表达式返回的是一个 Comparator Functional Interface,并接收 _c1_ 和 _c2_ 作为参数进行比较;来看看相关的用例,

1
2
List<Person> people = ...
people.sort(comparing(p -> p.getLastName()));

这个例子中,people 充当的就是 _T_ 同时也充当的是参数 _c_,而 p.getLastName() 充当的就是 _U_;上面的例子默认是按照字母升序的方式进行排列的,那么如果我们想要按照降序的方式进行排列呢?

1
people.sort(comparing(p -> p.getLastName()).reversed());

如果我们想要实现 order by Last Name then First Name 呢?

1
2
3
Comparator<Person> c = Comparator.comparing(p -> p.getLastName())
.thenComparing(p -> p.getFirstName());
people.sort(c);

Mutative collection operations

Non-interference 中我们知道,Stream 的操作对源数据有严格的限制,既是将源当做 Constant,不允许修改;但有时候,我们期望借助 Stream 的特性对源数据进行集中的修改;为了满足这个特性,一些新的方法加入了 CollectionListMapIterable 接口中来利用 lambda 和 Stream 的特性;

  • Iterable
    Iterable.forEach(Consumer)
  • Collection
    Collection.removeAll(Predicate)
  • List
    List.replaceAll(UnaryOperator)
    List.sort(Comparator)
  • Map
    Map.computeIfAbsent()

实战

filter and forEach

找到蓝色的元素,并将其修改为红色;

1
2
3
shapes.stream()
.filter(s -> s.getColor() == BLUE)
.forEach(s -> { s.setColor(RED); });

shapes 是一个 Stream 对象。

使用 collect 将结果保存到新的队列中

  1. 过滤出 BLUE 元素,并将这些元素保存到新的队列 ArrayList 中;

    1
    2
    3
    List<Shape> blue = shapes.stream()
    .filter(s -> s.getColor() == BLUE)
    .collect(new ArrayList<>());
  2. 如果每一个 Shape 都包含在一个 Box 中,那么我们想统计哪些 Box 包含了 BLUE 元素

    1
    2
    3
    4
    Set<Box> hasBlueShape = shapes.stream()
    .filter(s -> s.getColor() == BLUE)
    .map(s -> s.getContainingBox())
    .collect(new HashSet<>());

sum

统计所有蓝色形状物体的重量

1
2
3
4
int sum = shapes.stream()
.filter(s -> s.getColor() == BLUE)
.map(s -> s.getWeight())
.sum();

sort

假设我们有这样一个需求,找到某个专辑中至少有一首歌的评分为 4 颗星以上的专辑的名字,结果通过名称进行升序排列;

传统的实现方式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
List<Album> favs = new ArrayList<>();
for (Album a : albums) {
boolean hasFavorite = false;
for (Track t : a.tracks) {
if (t.rating >= 4) {
hasFavorite = true;
break;
}
}
if (hasFavorite)
favs.add(a);
}
Collections.sort(favs, new Comparator<Album>() {
public int compare(Album a1, Album a2) {
return a1.name.compareTo(a2.name);
}});

使用 Java Stream

1
2
3
4
5
List<Album> sortedFavs =
albums.stream()
.filter(a -> a.tracks.anyMatch(t -> (t.rating >= 4)))
.sorted(comparing(a -> a.name))
.collect(new ArrayList<>());

flatMap

将一个输入的元素 T 转换成一个输入流 Stream R 并交由下游系统进行处理;比如我们要统计某个单词出现的频率,就可以将每一首歌曲的名字通过 flatMap 转换成 R 并交由下游系统进行处理;

1
2
3
4
5
6
Pattern pattern = Pattern.compile("\\s+");
Map<String, Integer> wordFreq =
tracks.stream()
.flatMap(t -> pattern.splitAsStream(t.name)) // Stream<String>
.collect(groupingBy(s -> s.toUpperCase(),
counting()));

该例子的详情参考 Collectors 章节;

References

本文的撰写参考了如下的两则引用,

State of the Lambda - Libraries Edition sotc3: http://cr.openjdk.java.net/~briangoetz/lambda/sotc3.html
State of the Lambda - Libraries Edition final: http://cr.openjdk.java.net/~briangoetz/lambda/lambda-libraries-final.html