JDK源码分析——ForkJoinPool实例分析

M君 4月前 ⋅ 381 阅读

本篇文章中的源码可以从我的github上下载:

https://github.com/mh47838704/JavaExample

一、fk概要

本节主要通过编写应用实例来分析fk(ForkJoinPool的简称),通过对样例代码的调试可以从使用上对fk有一个直观的了解,当然关于fk的文章已经非常多了,本篇文章的特色主要在于可以提供一个比较全面直观的了解。本篇文章首先对什么是fk做一个简单的介绍,然后对实例代码进行分析,最后调试分析实例代码并进行总结。

目前介绍fk的相关的资源已经很多了,所以不再多过多的介绍。它主要提供了类似于mapreduce的功能,可以将任务划分成为多个同类型独立的子任务,所有的子任务会被fk自己维护的线程池执行(该线程池和普通的线程池有一定的区别,加入了一个work−stealing的机制)

前的内容是在网上可以搜索得到的,初一看确实是这么回事,但是经过我深入的分析发现:读者需要仔细区分两个东西:1、任务的划分不是fk负责的,是开发者自己负责的;2、fk仅仅封装了线程池和work−stealing机制,以及执行任务(PS:任务的执行不仅仅只包含业务逻辑,而且包含了任务的划分逻辑);

二、fk框架和任务划分

下面通过一张图展示一下前面对fk框架的说明,从下图我们可以看出,从最开始只有一个任务被添加到fk中,然后fk开始执行该任务,在该任务的执行方法中包含了两部分:1、任务本身的业务逻辑;2、子任务的划分逻辑。

通过该图我们可以了解了fk通过ForkJoinTask接口让开发者自己定制化业务逻辑和任务划分机制,fk内部如何调度运行ForkJoinTask,以及如何实现work-strealing机制对开发者都是透明的,从上面我们可以看出,如果用户对任务的划分机制不合理,那么会出现ForkJoinPool的栈溢出。(只是fk层面的,不是jvm层面的,可以通过捕获该异常并进行相关的修复处理)

三、实例代码分析

既然ForkJoinTask是fk暴露给开发者使用的接口,那么意味着开发只需要实现ForkJoinTask就可以了,在jdk中为了更加方便开发者开发,默认实现了部分类型的任务,本实例中主要介绍带结果存储的RecursiveTask,从RecursiveTask中我们可以看出该类实现了exec()方法,并提供了compute方法作为子类的扩展方法。

在该类的简介中有一个求取裴波拉切值的样例,从该样例中我们可以看到compute方法中包含了两个逻辑:业务执行逻辑、任务划分等待逻辑,和我们第二节中的图中的表示是一模一样的。

/**
* A recursive result-bearing {@link ForkJoinTask}.
*
*

For a classic example, here is a task computing Fibonacci numbers:
*
*

 {@code
* class Fibonacci extends RecursiveTask {
* final int n;
* Fibonacci(int n) { this.n = n; }
* Integer compute() {
* if (n <= 1)
* return n;
* Fibonacci f1 = new Fibonacci(n - 1);
* f1.fork();
* Fibonacci f2 = new Fibonacci(n - 2);
* return f2.compute() + f1.join();
* }
* }}


*
* However, besides being a dumb way to compute Fibonacci functions
* (there is a simple fast linear algorithm that you'd use in
* practice), this is likely to perform poorly because the smallest
* subtasks are too small to be worthwhile splitting up. Instead, as
* is the case for nearly all fork/join applications, you'd pick some
* minimum granularity size (for example 10 here) for which you always
* sequentially solve rather than subdividing.
*
* @since 1.7
* @author Doug Lea
*/
public abstract class RecursiveTask extends ForkJoinTask {
private static final long serialVersionUID = 5232453952276485270L;

/**
* The result of the computation.
*/
V result;

/**
* The main computation performed by this task.
* @return the result of the computation
*/
protected abstract V compute();

public final V getRawResult() {
return result;
}

protected final void setRawResult(V value) {
result = value;
}

/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}

}

通过查看RecursiveTask的介绍裴波拉切样例的实现,接下来我们实现一个数组的求和,下面是数组求和任务的样例代码(PS:本篇文章中的代码都可以在我的github中的JavaExample中找到)

package com.mh.JavaExample.fk;

import java.util.concurrent.RecursiveTask;

/**
* 求和递归任务
* Start at: 2018/5/27 13:11
*
* @author muhong
*/
public class MCountTask extends RecursiveTask {

private static int FACTOR = 2;

private long[] dataArray;
private int from;
private int end;

public MCountTask(long[] dataArray, int from, int end){
super();
this.dataArray = dataArray;
this.from = from;
this.end = end;
}
@Override
protected Long compute() {
long result = 0;
// 参数校验
if (!validate())
return result;
// 业务逻辑
System.out.println(Thread.currentThread().getId()+":开始计算");
if((end - from) < FACTOR){
for (int i = from; i < end; i++){
result += dataArray[i];
}
return result;
}
// 任务划分逻辑
int mid = (from + end) / 2;
MCountTask left = new MCountTask(dataArray, from, mid);
MCountTask right = new MCountTask(dataArray, mid, end);
left.fork();
right.fork();
left.join();
right.join();
long leftResult = (long)left.getRawResult();
long rightResult = (long)right.getRawResult();
result += leftResult + rightResult;
System.out.println(Thread.currentThread().getId()+":"+leftResult+","
+rightResult+","+result);
return result;
}

private boolean validate(){
if(this.dataArray == null || from < 0 || end < 0
|| from > end)
return false;
return true;
}
}

测试代码

package com.mh.JavaExample.fk;

import org.junit.Test;

import java.util.concurrent.ForkJoinPool;

/**
* 测试fk
* Start at: 2018/5/27 13:31
*
* @author muhong
*/
public class TestMain {

@Test
public void test(){
long[] dataArray = {1, 2, 3, 4, 5, 6, 7};
ForkJoinPool forkJoinPool = new ForkJoinPool();
MCountTask countTask = new MCountTask(dataArray, 0, dataArray.length);
forkJoinPool.submit(countTask);

}
}

 

四、运行结果分析

运行上面的测试代码,可以得到下面的输出(最前面是线程的ID)。从下面的输出我们可以看到有两个线程在进行求和计算,这说明了我们只需要关系任务的业务逻辑和任务划分逻辑,具体任务的线程调度运行是有fk自己维护的,fk具体的实现将会在后面的文章中介绍。

11:开始计算
11:开始计算
11:开始计算
11:开始计算
11:开始计算
11:开始计算
11:2,3,5
11:1,5,6
11:开始计算
11:开始计算
12:开始计算
12:开始计算
12:开始计算
12:6,7,13
14:开始计算
11:开始计算
11:4,5,9
11:9,13,22
11:6,22,28

 

五、参考

ForkJoinPool论文:http://gee.cs.oswego.edu/dl/papers/fj.pdf


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: