JDK源码分析——ThreadPoolExecutor实现分析

M君 5月前 ⋅ 389 阅读

本篇文章主要对jdk中线程池ThreadPoolExecutor源码中部分关键功能的源码实现进行解析,主要包括:

  1. 线程池的存放的容器——HashMap
  2. 工作线程Worker的实现,以及任务执行方式
  3. 有新的执行任务到达的时候,任务执行过程:分配新的线程?还是加入任务队列
  4. 当线程池中任务很少,但是线程池中空闲量过大的情况,线程池如何处理多余的空闲线程

如果想要了解上述情况在ThreadPoolExecutor中如何实现的,可以参考下面的文章(已获得作者的转载授权):

https://blog.csdn.net/m47838704/article/details/79548569

ThreadPoolExecutor已经对上面的各种情况进行了封装处理,并且提供了相应的参数对上述各种情况进行控制,首先我们看一下该类的构造函数,从源码我们可以看出该类的构造函数有很多个,不同的构造函数需要填写的参数不同。针对没有填写的参数都有相应的默认的值,这样可以简化开发者的配置。但是针对性能要求更高的情况,建议开发者自定义所有的参数。下面我们首先通过分析构造函数来对该类的功能进行一个整体的认识:

/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
* @throws IllegalArgumentException if one of the following holds:

* {@code corePoolSize < 0}

* {@code keepAliveTime < 0}

* {@code maximumPoolSize <= 0}

* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} or {@code handler} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}

从上面的构造函数我们可以看出开发者在使用该类的时候可以配置的所有的参数,我们首先分析上面构造函数的参数的意义,然后我们看看该类源码中所有的参数的描述:

corePoolSize:线程池中核心线程的数量,指线程池中需要保留的最小的线程的数(PS:当线程池刚初始化的时候,线程池中的线程数量<corePoolSize)

maximumPoolSize:线程池中允许的最大的线程的数量

workQueue:任务队列,当新任务到达且线程池中没有线程可用的时候,任务将会被暂时存放到任务队列中,等待其他线程执行完毕后再执行

keepAliveTime:线程空闲超时,当线程池中没有任务或者任务的数量小于线程数量的时候,如果当前线程池中的线程数量>corePoolSize,那么多余的线程将会被关闭,直到线程池中的数量=corePoolSize。(PS:在该类中还提供了一个特殊的接口,通过该接口可以设置是否允许核心线程超时,如果允许核心线程超时,那么线程池中所有超时的线程都将会被关闭)

public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}

threadFactory:线程工厂,用于生产线程,线程池中所有的线程都来自于线程工厂。开发中可以通过自定义线程工厂,实现自定义线程的实现,自定义线程的实现方便后续的开发调试,以及允许环境的问题定位。

handler:是任务拒绝异常处理器,当线程池没有可用的空闲的线程以及线程队列已满的情况,线程池无法接收任务,这种情况下线程池会调用handler进行后续的处理:

/**
* Invokes the rejected execution handler for the given command.
* Package-protected for use by ScheduledThreadPoolExecutor.
*/
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}

如果开发者没有没有设置该handler,那么该handler为线程池默认的handler=defaultHandler 

/**
* The default rejected execution handler.
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy()

该hanlder默认抛出拒绝异常RejectedExecutionException,开发者可以通过捕获该异常来判断任务的执行情况,然后作针对性的处理,或者是自定义RejectedExecutionHandler完成任务拒绝后的后续处理。由于异常的处理比较耗时且影响性能,所以根据开发规范:能使用判断避免异常产生的情况,不建议抛出异常,所以建议自定义handler进行任务拒绝后的处理。

/**
* A handler for rejected tasks that throws a
* {@code RejectedExecutionException}.
*/
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }

/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}

 


注意:本文归作者所有,未经作者允许,不得转载

全部评论: 0

    我有话说: