线程池的执行流程

2年前 (2022) 程序员胖胖胖虎阿
274 0 0

线程池的执行流程

如果所示,就是线程池的执行过程,可以分为三个主要步骤:

1.提交任务后会首先进行当前工作线程数与核心线程数的比较,如果当前工作线程数小于核心线程数,则直接调用 addWorker() 方法创建一个核心线程去执行任务;

2.如果工作线程数大于核心线程数,即线程池核心线程数已满,则新任务会被添加到阻塞队列中等待执行,当然,添加队列之前也会进行队列是否为空的判断;

3.如果线程池里面存活的线程数已经等于核心线程数了,且阻塞队列已经满了,再会去判断当前线程数是否已经达到最大线程数 maximumPoolSize,如果没有达到,则会调用 addWorker() 方法创建一个非核心线程去执行任务;

4.如果当前线程的数量已经达到了最大线程数时,当有新的任务提交过来时,会执行拒绝策略

总结来说就是优先核心线程、阻塞队列次之,最后非核心线程

java.util.concurrent.ThreadPoolExecutor#execute 的源码为:

 public void execute(Runnable command) {
     if (command == null)
         throw new NullPointerException();
     /*
      * Proceed in 3 steps:
      *
      * 1. If fewer than corePoolSize threads are running, try to
      * start a new thread with the given command as its first
      * task.  The call to addWorker atomically checks runState and
      * workerCount, and so prevents false alarms that would add
      * threads when it shouldn't, by returning false.
      *
      * 2. If a task can be successfully queued, then we still need
      * to double-check whether we should have added a thread
      * (because existing ones died since last checking) or that
      * the pool shut down since entry into this method. So we
      * recheck state and if necessary roll back the enqueuing if
      * stopped, or start a new thread if there are none.
      *
      * 3. If we cannot queue task, then we try to add a new
      * thread.  If it fails, we know we are shut down or saturated
      * and so reject the task.
      */
     int c = ctl.get();
     if (workerCountOf(c) < corePoolSize) {
         if (addWorker(command, true))
             return;
         c = ctl.get();
     }
     if (isRunning(c) && workQueue.offer(command)) {
         int recheck = ctl.get();
         if (! isRunning(recheck) && remove(command))
             reject(command);
         else if (workerCountOf(recheck) == 0)
             addWorker(null, false);
     }
     else if (!addWorker(command, false))
         reject(command);
 }

可以看到,源码中的注释部分也说明了整个 excute() 方法的执行过程可分为三个步骤。

源码中变量 ctl 是一个原子类,主要作用是用来保存线程数量和线程池的状态。这里采用高 3 位来保存运行状态,低 29 位来保存线程数量。

在这几个步骤中,最重要的当属 addWorker() 方法了。

private boolean addWorker(Runnable firstTask, boolean core);

第一个参数表示要执行的任务,如果为 null,则从阻塞队列中拉取任务;

第二个参数表示是否是核心线程,用来控制 addWorker() 的流程。

addWorker() 方法的实现主流程为:

线程池的执行流程

源码为:

private boolean addWorker(Runnable firstTas
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
        // Check if queue empty only if nec
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;
        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize 
                return false;
            if (compareAndIncrementWorkerCo
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to worke
        }
    }
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = 
            mainLock.lock();
            try {
                // Recheck while holding lo
                // Back out on ThreadFactor
                // shut down before lock ac
                int rs = runStateOf(ctl.get
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firs
                    if (t.isAlive()) // pre
                        throw new IllegalTh
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize
                        largestPoolSize = s
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker() 执行过程中会反复使用 runStateOf() 和 workerCountOf() 来获取线程池的状态和工作线程的数量。

本文参考自:三分钟弄懂线程池执行过程 - 掘金

版权声明:程序员胖胖胖虎阿 发表于 2022年9月15日 下午1:56。
转载请注明:线程池的执行流程 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...