如果所示,就是线程池的执行过程,可以分为三个主要步骤:
1.提交任务后会首先进行当前工作线程数与核心线程数的比较,如果当前工作线程数小于核心线程数,则直接调用 addWorker() 方法创建一个核心线程去执行任务;
2.如果工作线程数大于核心线程数,即线程池核心线程数已满,则新任务会被添加到阻塞队列中等待执行,当然,添加队列之前也会进行队列是否为空的判断;
3.如果线程池里面存活的线程数已经等于核心线程数了,且阻塞队列已经满了,再会去判断当前线程数是否已经达到最大线程数 maximumPoolSize,如果没有达到,则会调用 addWorker() 方法创建一个非核心线程去执行任务;
4.如果当前线程的数量已经达到了最大线程数时,当有新的任务提交过来时,会执行拒绝策略
总结来说就是优先核心线程、阻塞队列次之,最后非核心线程。
java.util.concurrent.ThreadPoolExecutor#execute 的源码为:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
可以看到,源码中的注释部分也说明了整个 excute() 方法的执行过程可分为三个步骤。
源码中变量 ctl 是一个原子类,主要作用是用来保存线程数量和线程池的状态。这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。
在这几个步骤中,最重要的当属 addWorker() 方法了。
private boolean addWorker(Runnable firstTask, boolean core);
第一个参数表示要执行的任务,如果为 null,则从阻塞队列中拉取任务;
第二个参数表示是否是核心线程,用来控制 addWorker() 的流程。
addWorker() 方法的实现主流程为:
源码为:
private boolean addWorker(Runnable firstTas
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if nec
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize
return false;
if (compareAndIncrementWorkerCo
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to worke
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock =
mainLock.lock();
try {
// Recheck while holding lo
// Back out on ThreadFactor
// shut down before lock ac
int rs = runStateOf(ctl.get
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firs
if (t.isAlive()) // pre
throw new IllegalTh
workers.add(w);
int s = workers.size();
if (s > largestPoolSize
largestPoolSize = s
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
addWorker() 执行过程中会反复使用 runStateOf() 和 workerCountOf() 来获取线程池的状态和工作线程的数量。
本文参考自:三分钟弄懂线程池执行过程 - 掘金