前言:
ForkJoin是在Java7中新加入的特性,大家可能对其比较陌生,但是Java8中Stream的并行流parallelStream就是依赖于ForkJoin。在ForkJoin体系中最为关键的就是ForkJoinTask和ForkJoinPool,ForkJoin就是利用分治的思想将大的任务按照一定规则Fork拆分成小任务,再通过Join聚合起来。
什么是ForkJoin?ForkJoin 从字面上看Fork是分岔的意思,Join是结合的意思,我们可以理解为将大任务拆分成小任务进行计算求解,最后将小任务的结果进行结合求出大任务的解,这些裂变出来的小任务,我们就可以交给不同的线程去进行计算,这也就是分布式计算的一种思想。这与大数据中的分布式离线计算MapReduce类似,对ForkJoin最经典的一个应用就是Java8中的Stream,我们知道Stream分为串行流和并行流,其中并行流parallelStream就是依赖于ForkJoin来实现并行处理的。
下面我们一起来看一下最为核心的ForkJoinTask和ForkJoinPool。
ForkJoinTask 任务ForkJoinTask本身的依赖关系并不复杂,它与异步任务计算FutureTask一样均实现了Future接口,FutureTask我们在之前的文章中有讲到感兴趣的可以阅读一下——Java从源码看异步任务计算FutureTask
下面我们就ForkJoinTask的核心源码来研究一下,该任务是如何通过分治法进行计算。
ForkJoinTask最核心的莫过于fork()和join()方法了。
fork()
判断当前线程是不是ForkJoinWorkerThread线程是 直接将当前线程push到工作队列中否 调用ForkJoinPool 的externalPush方法在ForkJoinPool构建了一个静态的common对象,这里调用的就是common的externalPush()
join()
调用doJoin()方法,等待线程执行完成RecursiveTask 是ForkJoinTask的一个子类主要对获取结果的方法进行了实现,通过泛型约束结果。我们如果需要自己创建任务,仍需要实现RecursiveTask,并去编写最为核心的计算方法compute()。
ForkJoinPool 线程池ForkJoinTask 中许多功能都依赖于ForkJoinPool线程池,所以说ForkJoinTask运行离不开ForkJoinPool,ForkJoinPool与ThreadPoolExecutor有许多相似之处,他是专门用来执行ForkJoinTask任务的线程池,我之前也有文章对线程池技术进行了介绍,感兴趣的可以进行阅读——从java源码分析线程池(池化技术)的实现原理
ForkJoinPool与ThreadPoolExecutor的继承关系几乎是相同的,他们相当于兄弟关系。
工作窃取算法ForkJoinPool中采取工作窃取算法,如果每次fork子任务如果都去创建新线程去处理的话,对系统资源的开销是巨大的,所以必须采取线程池。一般的线程池只有一个任务队列,但是对于ForkJoinPool来说,由于同一个任务Fork出的各个子任务是平行关系,为了提高效率,减少线程的竞争,需要将这些平行的任务放到不同的队列中,由于线程处理不同任务的速度不同,这样就可能存在某个线程先执行完了自己队列中的任务,这时为了提升效率,就可以让该线程去“窃取”其它任务队列中的任务,这就是所谓的“工作窃取算法”。
对于一般的队列来说,入队元素都是在队尾,出队元素在队首,要满足“工作窃取”的需求,任务队列应该支持从“队尾”出队元素,这样可以减少与其它工作线程的冲突(因为其它工作线程会从队首获取自己任务队列中的任务),这时就需要使用双端阻塞队列来解决。
构造方法首先我们来看ForkJoinPool线程池的构造方法,他为我们提供了三种形式的构造,其中最为复杂的是四个入参的构造,下面我们看一下它四个入参都代表什么?
int parallelism 可并行级别(不代表最多存在的线程数量)ForkJoinWorkerThreadFactory factory 线程创建工厂UncaughtExceptionHandler handler 异常捕获处理器boolean asyncMode 先进先出的工作模式 或者 后进先出的工作模式 提交方法下面我们看一下提交任务的方法:
externalPush这个方法我们很眼熟,它正是在fork的时候如果当前线程不是ForkJoinWorkerThread,新提交任务也是会通过这个方法去执行任务。由此可见,fork就是新建一个子任务进行提交。
externalSubmit是最为核心的一个方法,它可以首次向池提交第一个任务,并执行二次初始化。它还可以检测外部线程的首次提交,并创建一个新的共享队列。
signalWork(ws, q)是发送工作信号,让工作队列进行运转。
创建工人(线程)提交任务后,通过signalWork(ws, q)方法,发送工作信号,当符合没有执行完毕,且没有出现异常的条件下,循环执行任务,根据控制变量尝试添加工人(线程),通过线程工厂,生成线程,并且启动线程,也控制着工人(线程)的下岗。
例:ForkJoinTask实现归并排序这里我们就用经典的归并排序为例,构建一个我们自己的ForkJoinTask,按照归并排序的思路,重写其核心的compute()方法,通过ForkJoinPool.submit(task)提交任务,通过get()同步获取任务执行结果。
ForkJoin计算流程通过ForkJoinPool提交任务,获取结果流程如下,拆分子任务不一定是二分的形式,可参照MapReduce的模式,也可以按照具体需求进行灵活的设计。
到此这篇关于一文带你了解Java中的ForkJoin的文章就介绍到这了,更多相关Java中的ForkJoin内容请搜索七叶笔记以前的文章或继续浏览下面的相关文章希望大家以后多多支持七叶笔记!