forkjava fork join 框架最多多少个子任务

Fork/Join框架介绍
顺便测试jvm的内存溢出问题 - CSDN博客
Fork/Join框架介绍
顺便测试jvm的内存溢出问题
1. 什么是Fork/Join框架
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
我们再通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。Fork/Join的运行流程图如下:
2. Fork/Join框架的介绍
我们已经很清楚Fork/Join框架的需求了,那么我们可以思考一下,如果让我们来设计一个Fork/Join框架,该如何设计?这个思考有助于你理解Fork/Join框架的设计。
第一步分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停的分割,直到分割出的子任务足够小。
第二步执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
java.util.concurrent
类 ForkJoinTask&V&
java.util.concurrent.ForkJoinTask&V&
所有已实现的接口:
直接已知子类:
Fork/Join使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
RecursiveAction:用于没有返回结果的任务。
RecursiveTask :用于有返回结果的任务。
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
3. 使用Fork/Join框架
让我们通过一个简单的需求来使用下Fork/Join框架,需求是:计算1+2+3+4的结果。
使用Fork/Join框架首先要考虑到的是如何分割任务,如果我们希望每个子任务最多执行两个数的相加,那么我们设置分割的阈值是2,由于是4个数字相加,所以Fork/Join框架会把这个任务fork成两个子任务,子任务一负责计算1+2,子任务二负责计算3+4,然后再join两个子任务的结果。因为是有结果的任务,所以必须继承RecursiveTask,实现代码如下:
import java.util.concurrent.RecursiveT
* @author zhengchao
public class CalculatorFork_Join extends RecursiveTask&Long& {
private static final int THRESHOLD = 10000;
public CalculatorFork_Join(int start, int end) {
this.start =
this.end =
protected Long compute() {
long sum = 0;
if((end - start) & THRESHOLD){
for(int i = i&=i++){
int middle = (start + end) /2;
CalculatorFork_Join left = new CalculatorFork_Join(start, middle);
CalculatorFork_Join right = new CalculatorFork_Join(middle + 1, end);
left.fork();
right.fork();
sum = left.join() + right.join();
import java.util.concurrent.ExecutionE
import java.util.concurrent.ForkJoinP
import java.util.concurrent.F
* @author zhengchao
public class testMain {
public static void main(String[] args) throws ExecutionException, InterruptedException{
long start = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future&Long& result = forkJoinPool.submit(new CalculatorFork_Join(0, 1000000));
System.out.println(&ForkJoinCostTime:&+(System.currentTimeMillis()-start));
System.out.println(result.get());
start = System.currentTimeMillis();
long sum = 0;
for(int i=0;i&=1000000;i++){
System.out.println(&CostTime:&+(System.currentTimeMillis()-start));
System.out.println(sum);
forkJoinPool.shutdown();
通过这个例子让我们再来进一步了解ForkJoinTask,ForkJoinTask与一般的任务的主要区别在于它需要实现compute方法,在这个方法里,首先需要判断任务是否足够小,如果足够小就直接执行任务。如果不足够小,就必须分割成两个子任务,每个子任务在调用fork方法时,又会进入compute方法,看看当前子任务是否需要继续分割成孙任务,如果不需要继续分割,则执行当前子任务并返回结果。使用join方法会等待子任务执行完并得到其结果。
性能分析:
1、计算1-10000
ForkJoinCostTime:1
CostTime:0
成功构建 (总时间: 0 秒)数据量较小时,是用Fork/Join效果不明显,甚至不如正常的计算,这是因为创建Fork/Join耗时了。
2、计算1-100000
ForkJoinCostTime:1
CostTime:2
成功构建 (总时间: 0 秒)
3、计算1-1000000
ForkJoinCostTime:1
CostTime:3
成功构建 (总时间: 0 秒)
如果我们将
private static final int THRESHOLD = 10000; 设置为100,运行计算计算1-1000000:
ForkJoinCostTime:2
Exception in thread &main& java.util.concurrent.ExecutionException: java.lang.OutOfMemoryError
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:942)
at Concurrent.testMain.main(testMain.java:26)
Caused by: java.lang.OutOfMemoryError
at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
at java.util.concurrent.ForkJoinTask.get(ForkJoinTask.java:941)
... 1 more
Caused by: java.lang.OutOfMemoryError
at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
at Concurrent.pute(CalculatorFork_Join.java:40)
at Concurrent.pute(CalculatorFork_Join.java:15)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
Caused by: java.lang.OutOfMemoryError
at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
at Concurrent.pute(CalculatorFork_Join.java:40)
at Concurrent.pute(CalculatorFork_Join.java:15)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:762)
... 2 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError
... 14 more
Caused by: java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Thread.java:713)
at java.util.concurrent.ForkJoinPool.addWorker(ForkJoinPool.java:1132)
at java.util.concurrent.ForkJoinPool.tryPreBlock(ForkJoinPool.java:1009)
at java.util.concurrent.ForkJoinPool.tryAwaitJoin(ForkJoinPool.java:1042)
at java.util.concurrent.ForkJoinWorkerThread.joinTask(ForkJoinWorkerThread.java:731)
at java.util.concurrent.ForkJoinTask.doJoin(ForkJoinTask.java:362)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:639)
... 8 more
Java Result: 1
成功构建 (总时间: 0 秒)
内存溢出了,为什么呢?因为我们创建了太大的线程,导致内存消耗殆尽。
所以,阈值要设定的恰当,否则可能效率并不会提高,设置导致异常的产生。
本文已收录于以下专栏:
相关文章推荐
Java 7 并发编程实战手册目录代码下载(/Wang-Jun-Chao/java-concurrency)第五章 Fork/Join框架5.1简介  通常,使用J...
此博客迁至
http://maoshijie.me/
应用程序并行计算遇到的问题
当硬件处理能力不能按摩尔定律垂直发展的时候,选择了水平发展。多核处理器已广泛应用,未来处理器的核...
本周我们进入join的处理环节,其实在一开始学“连接”这个概念的时候,我感觉最晕菜的事儿是个类Join的区别。
left join 、right join 、outer join、inner join...
JDK 1.7 提供了一个并行计算的框架,本文就来分析一下它在使用和实现原理上,和ThreadPool有什么区别。何为Fork/Join?我们在大学算法课本上,学过的一种基本算法就是:分治。其基本思路...
之前说到ForkJoinPool的初始化,以及Task定义的方式,现在说说这个task是怎么启动的。从我们一般写代码的话,只会这样写一句:mainPool.invoke(task);就是调用了main...
关于ForkJoinTask:1,可以使用invokeAll(task)方法,主动执行其它的ForkJoinTask,并等待Task完成。(是同步的)2,还可以使用fork方法,让一个task执行(这...
细节不详细介绍,具体可参考q.com/cn/articles/fork-join-introduction。本文主要分析的submit处理流程,示例代码:package ...
Spring从2.5版本开始在编程中引入注解,用户可以使用@RequestMapping, @RequestParam, @ModelAttribute等等这样类似的注解。到目前为止,Spring的版...
来自devstore
美国队长:这怎么知道?
国防军官:这怎么不知道,21世纪就是本数码书。佐拉教会九头蛇怎么去读它,你的银行记录、病例、投票模式、电子邮件、通话信息、还有大学成绩单...左拉...
Java 8 已经发布一段时间了,许多开发者已经开始使用 Java 8。本文也将讨论最新发布在 JDK 中的并发功能更新。事实上,JDK 中已经有多处java.util.concurrent 改动,但...
他的最新文章
讲师:钟钦成
讲师:宋宝华
您举报文章:
举报原因:
原文地址:
原因补充:
(最多只允许输入30个字)让天下没有难学的技术
Fork/Join框架(六)取消任务
Fork/Join框架(六)取消任务
声明:本文是《 》的第五章,作者: Javier Fernández González
译者:许巧辉 校对:方腾飞
当你在一个ForkJoinPool类中执行ForkJoinTask对象,在它们开始执行之前,你可以取消执行它们。ForkJoinTask类提供cancel()方法用于这个目的。当你想要取消一个任务时,有一些点你必须考虑一下,这些点如下:
ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务。
当你取消一个任务时,你不能取消一个已经执行的任务。
在这个指南中,你将实现取消ForkJoinTask对象的例子。你将查找数在数组中的位置。第一个找到这个数的任务将取消剩下的任务(未找到这个数的任务)。由于Fork/Join框架并没有提供这种功能,所以,你将实现一个辅助类来做这个取消功能。
准备工作…
这个指南的例子使用Eclipse IDE实现。如果你使用Eclipse或其他IDE,如NetBeans,打开它并创建一个新的Java项目。
按以下步骤来实现这个例子:
1.创建ArrayGenerator类。这个类将产生一组随机的、指定大小的整数数字。实现generateArray()方法。它将产生一组数字,它接收数组大小作为参数。
public class ArrayGenerator {
public int[] generateArray(int size) {
int array[]=new int[size];
Random random=new Random();
for (int i=0; i& i++){
array[i]=random.nextInt(10);
2.创建一个TaskManager类。我们将使用这个类来存储在ForkJoinPool中执行的所有任务。由于ForkJoinPool和ForkJoinTask类的局限性,你将使用这个类来取消ForkJoinPool类的所有任务。
public class TaskManager {
3.声明一个对象参数化为ForkJoinTask类型的数列,其中ForkJoinTask类参数化为Integer类型。
private List&ForkJoinTask&Integer&&
4.实现这个类的构造器,它初始化任务数列。
public TaskManager(){
tasks=new ArrayList&&();
5.实现addTask()方法。它添加ForkJoinTask对象到任务数列。
public void addTask(ForkJoinTask&Integer& task){
tasks.add(task);
6.实现cancelTasks()方法。它将使用cancel()方法取消在数列中的所有ForkJoinTask对象。它接收一个想要取消剩余任务的ForkJoinTask对象作为参数。这个方法取消所有任务。
public void cancelTasks(ForkJoinTask&Integer& cancelTask){
for (ForkJoinTask&Integer& task :tasks) {
if (task!=cancelTask) {
task.cancel(true);
((SearchNumberTask)task).writeCancelMessage();
7.实现SearchNumberTask类,指定它继承参数化为Integer类型的RecursiveTask类。这个类将查找在整数数组的元素块中的数。
public class SearchNumberTask extends RecursiveTask&Integer& {
8.声明一个私有的、int类型的数字数组。
private int numbers[];
9.声明两个私有的、int类型的属性start和end。这些属性将决定任务要处理的数组的元素。
private int start,
10.声明一个私有的、int类型的属性number,它将存储你将要查找的数。
11.声明一个私有的、TaskManager类型的属性manager。你将使用这个对象来取消所有任务。
private TaskM
12.声明一个私有的、int类型的常量并初始化它为值-1。当任务没有找到这个数时,它将作为任务的返回值。
private final static int NOT_FOUND=-1;
13.实现这个类的构造器来初始化它的属性。
public Task(int numbers[], int start, int end, int number,
TaskManager manager){
this.numbers=
this.start=
this.number=
this.manager=
14.实现compute()方法。写入一条信息(start和end属性值)到控制台表明这个方法的开始。
protected Integer compute() {
System.out.println(&Task: &+start+&:&+end);
15.如果start和end之差大于10(这个任务将处理超过10个元素的数组),调用launchTasks()方法,将这个任务的工作拆分成两个任务。
if (end-start&10) {
ret=launchTasks();
16.否则,这个任务调用lookForNumber()方法来查找在数组块中的数。
ret=lookForNumber();
17.返回任务的结果。
18.实现lookForNumber()方法。
private int lookForNumber() {
19.对于任务要处理的元素块中的所有元素,将你想要查找的数与存储在元素中的值进行比较。如果他们相等,写入一条信息到控制台表明这种情形,使用TaskManager对象的cancelTasks()方法来取消所有任务,并返回你已经找到的这个数对应元素的位置。
for (int i= i& i++){
if (array[i]==number) {
System.out.printf(&Task: Number %d found in position
%d\n&,number,i);
manager.cancelTasks(this);
20.在循环的内部,令任务睡眠1秒。
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
21.最后,返回值-1。
return NOT_FOUND;
22.实现launchTasks()方法。首先,将这个任务要处理的数块分成两个部分,然后,创建两个Task对象来处理它们。
private int launchTasks() {
int mid=(start+end)/2;
Task task1=new Task(array,start,mid,number,manager);
Task task2=new Task(array,mid,end,number,manager);
23.添加这个任务到TaskManager对象中。
manager.addTask(task1);
manager.addTask(task2);
24.使用fork()方法异步执行这两个任务。
task1.fork();
task2.fork();
25.等待这个任务的结束,返回第一个任务的结果(如果它不等于1),或第二个任务的结果。
int returnV
returnValue=task1.join();
if (returnValue!=-1) {
return returnV
returnValue=task2.join();
return returnV
26.实现writeCancelMessage()方法,当任务取消时,写一条信息到控制台。
public void writeCancelMessage(){
System.out.printf(&Task: Canceled task from %d to
%d&,start,end);
27.实现这个例子的主类,通过创建Main类,并实现main()方法。
public class Main {
public static void main(String[] args) {
28.使用ArrayGenerator类,创建一个有1000个数字的数组。
ArrayGenerator generator=new ArrayGenerator();
int array[]=generator.generateArray(1000);
29.创建一个TaskManager对象。
TaskManager manager=new TaskManager();
30.使用默认的构造器创建一个ForkJoinPool对象。
ForkJoinPool pool=new ForkJoinPool();
31.创建一个Task对象来处理前面生成的数组。
Task task=new Task(array,0,1000,5,manager);
32.使用execute()方法,在池中异步执行任务。
pool.execute(task);
33.使用shutdown()方法关闭这个池。
pool.shutdown();
34.使用ForkJoinPool类的awaitTermination()方法,等待任务的结束。
pool.awaitTermination(1, TimeUnit.DAYS);
} catch (InterruptedException e) {
e.printStackTrace();
35.写入一条信息到控制台,表明程序的结束。
System.out.printf(&Main: The program has finished\n&);
它是如何工作的…
ForkJoinTask提供cancel()方法,允许你取消一个还未执行的任务。这是一个非常重要的点。如果任务已经开始它的执行,那么调用cancel()方法对它没有影响。这个方法接收一个Boolean值,名为mayInterruptIfRunning的参数。这个名字可能让你觉得,如果你传入一个true值给这个方法,这个任务将被取消,即使它正在运行。
Java API文档指出,在ForkJoinTask类的默认实现中,这个属性不起作用。任务只能在它们还未开始执行时被取消。一个任务的取消不会影响到已经提到到池的(其他)任务。它们继续它们的执行。 Fork/Join框架的一个局限性是,它不允许取消在ForkJoinPool中的所有任务。为了克服这个限制,你实现了TaskManager类。它存储被提到池中的所有任务。它有一个方法取消它存储的所有任务。如果一个任务由于它正在运行或已经完成而不能被取消,cancel()方法返回false值,所以,你可以尝试取消所有任务,而不用担心可能有间接的影响。 在这个例子中,你已经实现一个任务,用来在一个数字数组中查找一个数。如Fork/Join框架所推荐的,你将问题分解成更小的子问题。你只关心这个数的出现,所以当你找到它,你取消了其他任务。 以下截图显示这个例子执行的一部分:
在第5章,Fork/Join框架中的创建一个Fork/Join池指南
原创文章,转载请注明: 转载自本文链接地址:
许巧辉,目前在Letv练级,平时喜欢看书,关注Java并发
Latest posts by Snway ()
Related posts:
(4 votes, average: 3.75 out of 5)
Loading...线程基础:多任务处理——Fork/Join框架(要点2)
2-3. ForkJoinPool中的队列
那么ForkJoinPool是怎样创建队列的呢?请看如下两段源代码片段:
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
// ForkJoinPool类中的方法
// 该方法试图将一个任务提交到一个submission queue中,随机提交
final void externalPush(ForkJoinTask task) {
WorkQueue[] WorkQ
// 取得一个随机探查数,可能为0也可能为其它数
int r = ThreadLocalRandom.getProbe();
// 获取当前ForkJoinPool的运行状态
int rs = runS
// 最关键的操作在这里,详见后文说明
if ((ws = workQueues) != null && (m = (ws.length - 1)) &= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs & 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask[] int am, n,
if ((a = q.array) != null && (am = a.length - 1) & (n = (s = q.top) - q.base)) {
int j = ((am & s) && ASHIFT) + ABASE;
// 以下三个原子操作首先是将task放入队列
U.putOrderedObject(a, j, task);
// 然后将&q&这个submission queue的top标记+1
U.putOrderedInt(q, QTOP, s + 1);
// 最后解除这个submission queue的锁定状态
U.putIntVolatile(q, QLOCK, 0);
// 如果条件成立,说明这时处于active的工作线程可能还不够
// 所以调用signalWork方法
if (n &= 1)
signalWork(ws, q);
// 这里试图接除对这个submission queue的锁定状态
// 为什么会有两次接触呢?因为在之前代码中给队列加锁后,
// 可能队列的现有空间并不满足添加新的task的条件
U.compareAndSwapInt(q, QLOCK, 1, 0);
externalSubmit(task);
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool.
It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
// 以下是externalSubmit方法的部分代码,用于初始化ForkJoinPool中的队列
private void externalSubmit(ForkJoinTask task) {
// initialize
// 如果条件成立,就说明当前ForkJoinPool类中,还没有任何队列,所以要进行队列初始化
else if ((rs & STARTED) == 0 ||
((ws = workQueues) == null || (m = ws.length - 1) & 0)) {
int ns = 0;
rs = lockRunState();
if ((rs & STARTED) == 0) {
// 通过原子操作,完成&任务窃取次数&这个计数器的初始化
U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong());
// create workQueues array with size a power of two
// 这段代码也非常有趣,详见后文的分析。
int p = config & SMASK; // ensure at least 2 slots
int n = (p & 1) ? p - 1 : 1;
n |= n &&& 1; n |= n &&& 2;
n |= n &&& 4;
n |= n &&& 8; n |= n &&& 16; n = (n + 1) && 1;
workQueues = new WorkQueue[n];
ns = STARTED;
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
externalPush方法中的&q = ws[m & r & SQMASK]&代码非常重要。我们大致来分析一下作者的意图,首先m是ForkJoinPool中的WorkQueue数组长度减1,例如当前WorkQueue数组大小为16,那么m的值就为15;r是一个线程独立的随机数生成器,关于java.util.concurrent.ThreadLocalRandom类的功能和使用方式可参见其它资料;而SQMASK是一个常量,值为126 (0x7e)。以下是一种可能的计算过程和计算结果:
实际上任何数和126进行&与&运算,其结果只可能是0或者偶数,即0 、 2 、 4 、 6 、 8。也就是说以上代码中从名为&ws&的WorkQueue数组中,取出的元素只可能是第0个或者第偶数个队列。
我们再来看看以上代码给出的externalSubmit方法中,进行WZ喎"/kf/ware/vc/" target="_blank" class="keylink">vcmtRdWV1Zcr91+mz9cq8u6+1xLT6wuuho7WxzeKyv7X308PV4s2ouf1zdWJtaXShomV4ZWN1dGWhomludm9rZbe9t6jP8kZvcmtKb2luUG9vbMzhvbvSu7j2vMbL48jOzvHKsaOsu+HUy9DQ1eK2zrT6wuvOqkZvcmtKb2luUG9vbLS0vai24Lj2V29ya1F1ZXVlsqLQzrPJyv3X6aGjxuTW0NLUz8K0+sLrxqy2ztPD09rIt7ao1eK49ry0vau0tL2otcRXb3JrUXVldWXK/dfptcS089ChPC9wPg0KPHByZSBjbGFzcz0="brush:">
// SMASK是一个常量
static final int SMASK = 0
// 这是config的来源
// mode是ForkJoinPool构造函数中设定的asyncMode,如果为LIFO,则mode为0,否则为65536
// parallelism 为技术人员设置的(或者程序自行设定的)并发等级
this.config = (parallelism & SMASK) |
// ensure at least 2 slots
int p = config & SMASK;
// n这个变量就是要计算的WorkQueue数组的大小
int n = (p & 1) ? p - 1 : 1;
n |= n &&& 1; n |= n &&& 2;
n |= n &&& 4;
n |= n &&& 8; n |= n &&& 16; n = (n + 1) && 1;
从以上整理的代码可以看出,最后确认ForkJoinPool中WorkQueue数组初始化大小的因素是名叫config的变量,而config变量又与构造ForkJoinPool时所传入的并发等级(parallelism)、异步模式(asyncMode)有关。在我们选择LIFO模式时,计算结果如下表所示(下表中的结果建立在mode = 0的前提下):
parallelism
parallelism
是的,计算结果&n&按照两倍规模进行扩展,并且在初始化时保证和并发级别设定的数量(parallelism)至少两倍的关系。这是为什么呢?这是因为ForkJoinPool中的这些WorkQueue和工作线程ForkJoinWorkerThread并不是一对一的关系,而是随时都有多余ForkJoinWorkerThread数量的WorkQueue元素。而这个ForkJoinPool中的WorkQueue数组中,索引位为非奇数的工作队列用于存储从外部提交到ForkJoinPool中的任务,也就是所谓的submissions queue;索引位为偶数的工作队列用于存储归并计算过程中等待处理的子任务,也就是task queue。
这样我们也就可以明白,ForkJoinPool中重写的toString()方法,是如何取得submissions、tasks、steals和running监控数据,请看以下toString()方法的源码片段:
public String toString() {
WorkQueue[] WorkQ
if ((ws = workQueues) != null) {
// 循环着,依次遍历当前ForkJoinPool中WorkQueue数组的每一个元素
for (int i = 0; i & ws. ++i) {
if ((w = ws[i]) != null) {
// 获取当前WorkQueue中元素的数量
// (WorkQueue也是使用数组方式存储这些元素)
int size = w.queueSize();
// 如果当前数组元素的索引位为非奇数
// 说明是submissions queue,这时submissions计数器发生累加
if ((i & 1) == 0)
// 否则说明是task queue,这时tasks计数器增加
// 通过nsteals属性,获得这个task queue中任务被&窃取&的次数
// 如果条件成立,就说明当前task queue所对应的工作线程
// 没有被任何方式阻塞,所以running计数器增加
if (w.isApparentlyUnblocked())
return super.toString() +
&, running = & + rc +
&, steals = & + st +
&, tasks = & + qt +
&, submissions = & + qs +
注释比较详细,而且比起workqueue的入队出队逻辑和任务窃取逻辑,以上代码就是非常简单了。所以这里就不再赘述代码过程了。注意,以上代码分析基于JDK 1.8的分析,而与JDK1.7中ForkJoinPool的实现有较大差异。
2-4. 避免伪共享
所谓伪共享是指多核CPU在无锁情况下同时抢占同一缓存行的写操作权限所引起的频繁切换操作。是不是不好理解这句话?那么我们就采用图文方式来说明一下伪共享产生的场景:
上图是目前典型的计算机硬件层组织结构(只画了北桥部分,南桥部分不在我们讨论范围内,就忽略了),主存和CPU之间由北桥芯片负责数据交换,北桥芯片和CPU间的通讯速度被称为前端总线速度(和芯片类型、总线频率等因素有关),虽然基于目前的技术基础,前端总线速度都比较快,但总的来说还是算比较珍贵的计算资源。CPU内部结构主要有计算内核、一级缓存(L1)、二级缓存(L2)和三级缓存(L3),其中L1缓存为每个计算内核所独享的,L2缓存是否由每个计算内核所独享视不同的CPU型号不同而有所区别,L3缓存一般为CPU内核锁共享。L3、L2、L1缓存的内部读写速度和制造成本依次递增,这就意味着它们的容量依次递减。以AMD 锐龙5 1400 这款CPU为例,这是一款4核民用级CPU,L1 数据缓存(L1 Data)大小为 4 & 32KBytes、L2 缓存 4 & 64KBytes、L3 缓存2 & 4MBytes。注意L1级缓存除了数据缓存以外还有独立的指令缓存(L1 Inst),另外AMD 锐龙5 1400这款CPU的L3缓存为每两核共享一组,所以会出现2 & 4MBytes的情况。
为了避免CPU内核和相对缓慢的主存随时进行数据交互,CPU对在计算时对线程中数据的读写主要依靠这几级缓存进行,再通过前段总线将缓存中的数据和CPU数据进行同步。注意,真的只是相对而言的慢,DDR3/DDR4的内存速度还是很快的,DDR3-1600的读写速度可以达到 2.2 GB/s,而CPU L1缓存的数据读取速度可以达到 900 GB/s!
CPU对于数据缓存的操作单位是缓存行,也就是说当进行CPU要对内存中的数据进行操作时,CPU会将内存地址和临近地址的数据全部装入缓存后,再进行操作(实际上现代计算机系统为了加快I/O性能,大量采用这样的操作原则,例如在&数据存储&专题讲到MySQL的InnoDB数据引擎从磁盘上读取数据到内存时,也是采用&Page&为单位进行的)。例如给出的CPU缓存行每一行都有64个字节(部分型号CPU的缓存行为32个字节),而一个长整型的大小为64位也就是8个字节,那么一个缓存行就可以容纳8个长整型(而实际上缓存行头部会使用8字节加载一些描述信息,所以实际上最多能存7个长整型)。
如果工作在两个不同内核的两个线程(或者多个线程),需要对同一内存地址的数据进行写操作该怎么办呢?这里就要提到一个协议&&MESI协议(可以参看,或者这本书《What Every Programmer Should Know About Memory》)。简单的来说,如果CPU内核要对缓存行的数据进行写操作时,首先根据这个缓存行的状态,分析该缓存行的数据是否可能也存在于其它CPU内核的缓存行中(缓存行状态为S),如果有则发送一个RFO请求到其它CPU,让其它CPU设定缓存行的状态为Invalid(无效状态),这样就可以保证只有本CPU能够对缓存行的数据进行写操作。
那么问题就很明显了,如果这些内核不停的相互请求对同一数据的写操作权限,就会出现资源抢占的情况,导致各个CPU内核L1和L2缓存全部失效,最后都不得不到L3甚至主存中重读数据。这都还算好比较好的情况,上文已经说过CPU对数据的操作以缓存行为单位,而一个缓存行可容纳多个数据(这里记为A数据和B数据),CPU内核1需要对数据A进行修改,同时CPU内核2需要对数据B进行修改。当这样的情况出现时,从技术人员角度来看好像并没有出现资源抢占,但实际上在多个CPU内核中发生的情况却发生了不必要的抢占。
这就是伪共享,这个问题在中当然有解决办法,但是相对比较&暴力&。原理就是让A数据和B数据处于不同的缓存行&&缓存行在存储A数据后多出来的空间,采用一些无用的基本数据进行&补全&。这样B数据就可以处于不同缓存行了,如下代码所示:
// 这个是真实业务需要的属性,是一个长整型
public volatile long a = 0L;
// 需要补6个长整型
public long a1, a2, a3, a4, a5, a6;
//=================
// 值得一提的是,有的CPU缓存航为64字节,所以为了兼容性更好一点
// 你可以一直填充14个长整形
public long a1, a2, a3, a4, a5, a6;
public volatile long a = 0L;
public long a7, a8, a9, a10, a11, a12, a13, a14;
以上是Java代码的示例,是JDK1.7以及之前版本使用的一种解决方式。而JDK 1.8中有了一个更便利的解决方式,就是&@sun.misc.Contended&标记,而它的一个典型应用场景就在ForkJoinPool中:
@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
2-5. ForkJoinPool工作监控
ForkJoinPool重写了toString()方法,以便技术人员在代码调试或者其它需要临时监控ForkJoinPool运行情况的场景下,轻松获取ForkJoinPool中的主要工作状态。以下运行效果展示了ForkJoinPool类的toString()方法打印的情况:
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 224, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 213, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 164, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 9, running = 9, steals = 22, tasks = 281, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 8, running = 8, steals = 22, tasks = 212, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 22, tasks = 192, submissions = 0]
**********************
parallelism:当前ForkJoinPool设定的并行级别
size:当前ForkJoinPool线程池内部的所有线程数量,这些线程可能处于阻塞状态(使用join方法引起的阻塞或者任务中其它会引起线程阻塞方法引起的阻塞),可能处于运行状态。
active:当前线程池内部,正在进行compute计算的线程(这些线程不代表没有被阻塞)。
running:当前线程池内部,正在进行compute计算并且没有被任何阻塞线程阻塞机制所影响的线程数量
steals:当前ForkJoinPool线程池内部各个work queue间发生的&工作窃取&操作的总次数。
tasks:当前ForkJoinPool线程池内部各个work queue中等待处理的子任务总数量。
submissions:通过submit方式或者其它方式提交到ForkJoinPool中,准备进行归并计算的但是ForkJoinPool还没有开始处理的任务(ForkJoinTask任务或者其子任务)数量。
这里要重点说明一下active和running两个返回信息的关系和区别:通常情况下active数量和running数量是一致的,因为正在运行归并计算子任务的线程,肯定是处于运行状态,否则它怎么进行计算呢?但是如果技术人员在进行归并计算的时候,主动阻塞了线程就另当别论了。例如技术人员在compute()方法中使用wait方法主动阻塞线程的情况:
protected int[] compute() {
// 让任务随机等待
// 因为要进行1亿次计算,大约设定百万分之1的概率强制阻塞
if(ThreadLocalRandom.current().nextFloat() & 0.000001f) {
synchronized (this) {
this.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace(System.out);
以上的代码在对1亿条数据数进行排序时,有百万分之一的概率阻塞排序子任务。在子任务进行归并计算时,可强制让计算线程阻塞1秒。这时我们再执行这个应用程序并且进行监控,那么以下可能就是我们会看到的监控信息了:
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 3, steals = 21, tasks = 154, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 0, steals = 21, tasks = 142, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 2, steals = 25, tasks = 131, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 7, running = 2, steals = 28, tasks = 212, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 7, running = 3, steals = 28, tasks = 216, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 6, running = 0, steals = 28, tasks = 148, submissions = 0]
**********************
@2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 1, steals = 28, tasks = 142, submissions = 0]
**********************
因为是随机概率,所以读者自行运行的监控效果和这里给出的监控效果是不同的。

我要回帖

更多关于 fork join任务拆分 的文章

 

随机推荐