Java Stream详解

Contents
  1. lambda的作用域
    1. 局部变量
    2. this
    3. 引用的变量不可更改
  2. Stream
    1. Stream简介
    2. parallelStream简介
  3. parallelStream背后的男人:ForkJoinPool
    1. 工作窃取算法
    2. 用看forkjoin的眼光来看ParallelStreams
  4. ParallelStreams的陷阱
  5. 总结

在讨论Stream之前,我们有必要介绍一下lambda的作用域,因为stream通常是与lambda搭配来使用的,并且最近在工作中也遇到了作用域上面的问题(无法访问接口的默认方法)。

lambda的作用域

在Lambda中,变量的作用域与访问操作主要遵循以下规则:

  • 本地变量(Local Variable)可以访问但是不可以修改
  • 类成员变量与静态变量可以被读写,即闭包中的this实际指向的是创建该Lambda表达式的方法的this参数
  • 函数式接口的默认方法不可以在Lambda表达式中被访问

局部变量

lambda表达式的方法体与嵌套代码块有着相同的作用域。因此它也适用同样的命名冲突和屏蔽规则。在lambda表达式中不允许声明一个与局部变量同名的参数或者局部变量。

1
2
3
4
Path first = Paths.get("/usr/bin");
Comparator<String> comp = (first, second) ->
Integer.compare(first.length(), second.length());
// 错误,变量first已经定义了

在一个方法里,你不能有两个同名的局部变量,因此,你也不能在lambda表达式中引入这样的变量。在下一个示例中,lambda表达式有两个自由变量,text和count。数据结构表示lambda表达式必须存储这两个变量的值,即“Hello”和20。我们可以说,这些值已经被lambda表达式捕获了(这是一个技术实现的细节。例如,你可以将一个lambda表达式转换为一个只含一个方法的对象,这样自由变量的值就会被复制到该对象的实例变量中)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package java8test;
public class T1 {
public static void main(String[] args) {
repeatMessage("Hello", 20);
}
public static void repeatMessage(String text, int count) {
Runnable r = () -> {
for(int i = 0; i < count; i++) {
System.out.println(text);
Thread.yield();
}
};
new Thread(r).start();
}
}

this

当你在lambda表达式中使用this关键字,你会引用创建该lambda表达式的方法的this参数,以下面的代码为例:

1
2
3
4
5
public class Application {
public void doWork() {
Runnable runner = () -> {....;System.out.println(this.toString());......};
}
}

表达式this.toString()会调用Application对象的toString()方法,而不是Runnable实例的toString()方法。在lambda表达式中使用this,与在其他地方使用this没有什么不同。lambda表达式的作用域被嵌套在doWork()方法中,并且无论this位于方法的何处,其意义都是一样的。

引用的变量不可更改

Lambda表达式可以捕获闭合作用域中的变量值。在Java中,为了确保被捕获的值是被良好定义的,需要遵守一个重要的约束。在lambda表达式中,被引用的变量的值不可以被更改。例如,下面这个表达式是不合法的:

1
2
3
4
5
6
7
8
9
10
public static void repeatMessage(String text, int count) {
Runnable r = () -> {
while(count > 0) {
count--; // 错误,不能更改已捕获变量的值
System.out.println(text);
Thread.yield();
}
};
new Thread(r).start();
}

不要指望编译器会捕获所有并发访问错误。不可变的约束只作用在局部变量上,如果是一个实例变量或者闭合类的静态变量,那么不会有任何错误被报告出来,即使结果同样不确定。同样,改变一个共享对象也是完全合法的,即使这样并不恰当。例如:

1
2
3
4
List<Path> matches = new ArrayList<>();
for(Path p : files)
// 你可以改变matches的值,但是在多线程下是不安全的
new Thread(() -> {if(p中包含某些属性) matches.add(p);}).start();

注意matches是“有效final”的(一个有效的final变量被初始化后,就永远不会再被赋一个新值的变量)。在我们的示例中,matches总是引用同一个ArrayList对象,但是,这个对象是可变的,因此是线程不安全的。如果多个线程同时调用add方法,结果将无法预测。

Stream

这里不对Stream的API做过多的介绍,更多的是分析一下原理,所以比较枯燥。关于Stream的API,请移步这里

Stream简介

Stream是java 8中新增加的特性。

Stream 不是集合元素,它不是数据结构并不保存数据,它是有关算法和计算的,它更像一个高级版本的Iterator。原始版本的Iterator,用户只能显式地一个一个遍历元素并对其执行某些操作;高级版本的Stream,用户只要给出需要对其包含的元素执行什么操作,比如 “过滤掉长度大于 10 的字符串”、“获取每个字符串的首字母”等,Stream会隐式地在内部进行遍历,做出相应的数据转换。

Stream就如同一个迭代器(Iterator),单向,不可往复,数据只能遍历一次,遍历过一次后即用尽了,就好比流水从面前流过,一去不复返。

而和迭代器又不同的是,Stream可以并行化操作,迭代器只能命令式地、串行化操作。顾名思义,当使用串行方式去遍历时,每个item读完后再读下一个item。而使用并行去遍历时,数据会被分成多个段,其中每一个都在不同的线程中处理,然后将结果一起输出。Stream的并行操作依赖于Java 7中引入的Fork/Join框架来拆分任务和加速处理过程。Java的并行API演变历程基本如下:

1.0-1.4 中的 java.lang.Thread
5.0 中的 java.util.concurrent
6.0 中的 Phasers 等
7.0 中的 Fork/Join 框架
8.0 中的 Lambda

Stream 的另外一大特点是,数据源本身可以是无限的。

parallelStream简介

parallelStream其实就是一个并行执行的流。它通过默认的ForkJoinPool,可能提高你的多线程任务的速度。

parallelStream具有平行处理能力,处理的过程会分而治之,也就是将一个大任务切分成多个小任务,这表示每个任务都是一个操作:

1
2
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEach(out::println);

得到的展示顺序不一定会是1、2、3、4、5、6、7、8、9,而可能是任意的顺序,就forEach()这个操作來讲,如果平行处理时,希望最后顺序是按照原来Stream的数据顺序,那可以调用forEachOrdered()。例如:

1
2
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.parallelStream().forEachOrdered(out::println);

注意:如果forEachOrdered()中间有其他如filter()的中介操作,会试着平行化处理,然后最终forEachOrdered()会以原数据顺序处理,因此,使用forEachOrdered()这类的有序处理,可能会(或完全失去)失去平行化的一些优势,实际上中介操作亦有可能如此,例如sorted()方法。

parallelStream背后的男人:ForkJoinPool

要想深入的研究parallelStream之前,我们必须先了解ForkJoin框架和ForkJoinPool。本文旨在parallelStream,但因为两种关系甚密,故在此简单介绍一下ForkJoinPool。

ForkJoin框架是从jdk7中的特性,它同ThreadPoolExecutor一样,也实现了Executor和ExecutorService接口。它使用了一个无限队列来保存需要执行的任务,而线程的数量则是通过构造函数传入,如果没有向构造函数中传入希望的线程数量,那么默认值为当前计算机可用的CPU数量会被设置为线程数量

ForkJoinPool主要用来使用分治法(Divide-and-Conquer Algorithm)来解决问题。典型的应用比如快速排序算法。这里的要点在于,ForkJoinPool需要使用相对少的线程来处理大量的任务。比如要对1000万个数据进行排序,那么会将这个任务分割成两个500万的排序任务和一个针对这两组500万数据的合并任务。以此类推,对于500万的数据也会做出同样的分割处理,到最后会设置一个阈值来规定当数据规模到多少时,停止这样的分割处理。比如,当元素的数量小于10时,会停止分割,转而使用插入排序对它们进行排序。那么到最后,所有的任务加起来会有大概2000000+个。问题的关键在于,对于一个任务而言,只有当它所有的子任务完成之后,它才能够被执行

所以当使用ThreadPoolExecutor时,使用分治法会存在问题,因为ThreadPoolExecutor中的线程无法像任务队列中再添加一个任务并且在等待该任务完成之后再继续执行。而使用ForkJoinPool时,就能够让其中的线程创建新的任务,并挂起当前的任务,此时线程就能够从队列中选择子任务执行。

使用ThreadPoolExecutor或者ForkJoinPool,会有什么性能的差异呢?
首先,使用ForkJoinPool能够使用数量有限的线程来完成非常多的具有父子关系的任务,比如使用4个线程来完成超过200万个任务。但是,使用ThreadPoolExecutor时,是不可能完成的,因为ThreadPoolExecutor中的Thread无法选择优先执行子任务,需要完成200万个具有父子关系的任务时,也需要200万个线程,显然这是不可行的。

工作窃取算法

forkjoin最核心的地方就是利用了现代硬件设备多核,在一个操作时候会有空闲的cpu,那么如何利用好这个空闲的cpu就成了提高性能的关键,而这里我们要提到的工作窃取(work-stealing)算法就是整个forkjion框架的核心理念,工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

为什么需要使用工作窃取算法?
假如我们需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行

工作窃取算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,其缺点是在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且消耗了更多的系统资源,比如创建多个线程和多个双端队列。

用看forkjoin的眼光来看ParallelStreams

我们提到了在Java 8引入了自动并行化的概念。它能够让一部分Java代码自动地以并行的方式执行,也就是我们使用了ForkJoinPool的ParallelStream。

Java 8为ForkJoinPool添加了一个通用线程池,这个线程池用来处理那些没有被显式提交到任何线程池的任务。它是ForkJoinPool类型上的一个静态元素,它拥有的默认线程数量等于运行计算机上的处理器数量。当调用Arrays类上添加的新方法时,自动并行化就会发生。比如用来排序一个数组的并行快速排序,用来对一个数组中的元素进行并行遍历。自动并行化也被运用在Java 8新添加的Stream API中。

比如下面的代码用来遍历列表中的元素并执行需要的操作:

1
2
3
List<UserInfo> userInfoList =
DaoContainers.getUserInfoDAO().queryAllByList(new UserInfoModel());
userInfoList.parallelStream().forEach(RedisUserApi::setUserIdUserInfo);

对于列表中的元素的操作都会以并行的方式执行。forEach方法会为每个元素的计算操作创建一个任务,该任务会被前文中提到的ForkJoinPool中的通用线程池处理。以上的并行计算逻辑当然也可以使用ThreadPoolExecutor完成,但是就代码的可读性和代码量而言,使用ForkJoinPool明显更胜一筹。

对于ForkJoinPool通用线程池的线程数量,通常使用默认值就可以了,即运行时计算机的处理器数量。我这里提供了一个示例的代码让你了解jvm所使用的ForkJoinPool的线程数量, 你可以可以通过设置系统属性:-Djava.util.concurrent.ForkJoinPool.common.parallelism = N (N为线程数量),来调整ForkJoinPool的线程数量。

forEach会将执行forEach本身的线程也作为线程池中的一个工作线程。因此,即使将ForkJoinPool的通用线程池的线程数量设置为1,实际上也会有2个工作线程。因此在使用forEach的时候,线程数为1的ForkJoinPool通用线程池和线程数为2的ThreadPoolExecutor是等价的。

所以当ForkJoinPool通用线程池实际需要4个工作线程时,可以将它设置成3,那么在运行时可用的工作线程就是4了。

ParallelStreams的陷阱

上文中我们已经看到了ParallelStream强大无比的特性,但ParallelStreams不是万金油,而是一把双刃剑,如果错误的使用反倒可能伤人伤己.

以下是一个我们项目里使用parallel streams的很常见的情况。在这个例子中,我们想同时调用不同地址的api中并且获得第一个返回的结果。

1
2
3
4
5
6
7
public static String query(String q, List<String> {
Optional<String> result = engines.stream().parallel().map((base) -> {
String url = base + q;
return WS.url(url).get();
}).findAny();
return result.get();
}

我们仔细思考一下整个功能究竟是如何运转的。首先我们的集合元素engines由ParallelStreams并行的去进行map操作(ParallelStreams使用JVM默认的forkJoin框架的线程池由当前线程去执行并行操作)。

然而,这里需要注意的一地方是我们在调用第三方的api请求是一个响应略慢而且会阻塞操作的一个过程。所以在某时刻所有线程都会调用get()方法并且在那里等待结果返回。

再回过头仔细思考一下这个功能的实现过程是我们一开始想要的吗?我们是在同一时间等待所有的结果,而不是遍历这个列表按顺序等待每个回答。然而,由于ForkJoinPool workers的存在,这样平行的等待相对于使用主线程的等待会产生的一种副作用。

现在ForkJoin pool(关于forkjion的更多实现你可以去搜索引擎中去看一下他的具体实现方式)的实现是:它并不会因为产生了新的workers而抵消掉阻塞的workers。那么在某个时间所有ForkJoinPool.common()的线程都会被用光。也就是说,下一次你调用这个查询方法,就可能会在一个时间与其他的parallel stream同时运行,而导致第二个任务的性能大大受损。或者说,例如你在这个功能里是用来快速返回调用的第三方api的,而在其他的功能里是用于一些简单的数据并行计算的,但是假如你先调用了这个功能,同一时间之后调用计算的函数,那么这里forkjoinPool的实现会让你计算的函数大打折扣。

不过也不要急着去吐槽ForkJoinPool的实现,在不同的情况下你可以给它一个ManagedBlocker实例并且确保它知道在一个阻塞调用中应该什么时候去抵消掉卡住的workers。现在有意思的一点是,在一个parallel stream处理中并不一定是阻塞调用会拖延程序的性能。任何被用于映射在一个集合上的长时间运行的函数都会产生同样的问题。

总结

从stream和parallelStream方法中进行选择时,我们可以考虑以下几个问题:

是否需要并行?
你需要弄清楚你要解决的问题是什么,数据量有多大,计算的特点是什么?并不是所有的问题都适合使用并发程序来求解,比如当数据量不大时,顺序执行往往比并行执行更快。毕竟,准备线程池和其它相关资源也是需要时间的。但是,当任务涉及到I/O操作并且任务之间不互相依赖时,那么并行化就是一个不错的选择。通常而言,将这类程序并行化之后,执行速度会提升好几个等级。

任务之间是否是独立的?是否会引起任何竞态条件?
如果任务之间是独立的,并且代码中不涉及到对同一个对象的某个状态或者某个变量的更新操作,那么就表明代码是可以被并行化的。

结果是否取决于任务的调用顺序?
由于在并行环境中任务的执行顺序是不确定的,因此对于依赖于顺序的任务而言,并行化也许不能给出正确的结果。