【Java 多线程 3】线程池2

2年前 (2022) 程序员胖胖胖虎阿
304 0 0

一、线程池简介

线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程。每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中。如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙。如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值。超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

使用线程池减少的是线程的创建和销毁时间,因为创建一个对象要获取内存资源,虚拟机跟踪每个对象,以便可以在对象销毁后进行垃圾回收,所以提高服务程序效率的一个重要手段就是尽可能的降低创建和销毁对象的次数,特别是一些非常耗资源的对象创建和销毁。

二、组成部分

1、线程池管理器(ThreadPoolManager):用于创建并管理线程池。包含 创建线程池,销毁线程池,加入新任务;

2、工作线程(WorkThread): 线程池中的线程只有两种状态:可运行状态和等待状态

3、任务接口(Task):每一个任务必须实现的接口<有返回值的callable和无返回值的runnable>,以供工作线程调度任务的运行。它主要规定了任务的入口。任务运行完后的收尾工作,任务的运行状态等。

4、任务队列(work queue):用于存放没有处理的任务。提供一种缓冲机制,一般是BlockingQuene的实现类。

三、Java线程池的原理和实现

创建线程有两种方式:继承Thread或实现Runnable。

Thread实现了Runnable接口。提供了一个空的run()方法。所以不论是继承Thread还是实现Runnable,都要有自己的run()方法。

一个线程创建后就存在。调用start()方法就开始执行(执行run()方法)。调用wait进入等待或调用sleep进入休眠期,顺利执行完成或休眠被中断或执行过程中出现异常而退出。

线程池添加任务过程:

5b7c7db190d6f67f3bdca4a8c7edb4729a5.jpg

我们首先从线程池的创建说起,Executors.newFixedThreadPool(2)表示创建一个具有两个线程的线程池,源代码如下:

public class Executors {
    //生成一个最大为nThreads的线程池执行器
  public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
}

这里使用了LinkedBlockingQueue作为队列任务管理器,所有等待处理的任务都会放在该对列中,需要注意的是,此队列是一个阻塞式的单端队列。线程池建立好了,那就需要线程在其中运行了,线程池中的线程是在submit第一次提交任务时建立的,代码如下:

    public Future<?> submit(Runnable task) {
        //检查任务是否为null
        if (task == null) throw new NullPointerException();
        //把Runnable任务包装成具有返回值的任务对象,不过此时并没有执行,只是包装
        RunnableFuture<Object> ftask = newTaskFor(task, null);
        //执行此任务
        execute(ftask);
        //返回任务预期执行结果
        return ftask;
    }

此处的代码关键是execute方法,它实现了三个职责。

1、创建足够多的工作线程数,不大于最大线程数量,并保持线程处于运行或等待状态。

2、把等待处理的任务放到任务队列中

3、从任务队列中取出来执行

其中此处的关键是工作线程的创建,它也是通过new Thread方式创建的一个线程,只是它创建的并不是我们的任务线程(虽然我们的任务实现了Runnable接口,但它只是起了一个标志性的作用),而是经过包装的Worker线程,代码如下:  

private final class Worker implements Runnable {
// 运行一次任务
    private void runTask(Runnable task) {
        /* 这里的task才是我们自定义实现Runnable接口的任务 */
        task.run();
        /* 该方法其它代码略 */
    }
    // 工作线程也是线程,必须实现run方法
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
    // 任务队列中获得任务
    Runnable getTask() {
        /* 其它代码略 */
        for (;;) {
            return r = workQueue.take();
        }
    }
}

此处为示意代码,删除了大量的判断条件和锁资源。execute方法是通过Worker类启动的一个工作线程,执行的是我们的第一个任务,然后改线程通过getTask方法从任务队列中获取任务,之后再继续执行,但问题是任务队列是一个BlockingQuene,是阻塞式的,也就是说如果该队列的元素为0,则保持等待状态,直到有任务进入为止,我们来看LinkedBlockingQuene的take方法,代码如下:  

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            try {
                // 如果队列中的元素为0,则等待
                while (count.get() == 0)
                    notEmpty.await();
            } catch (InterruptedException ie) {
                notEmpty.signal(); // propagate to a non-interrupted thread
                throw ie;
            }
            // 等待状态结束,弹出头元素
            x = extract();
            c = count.getAndDecrement();
            // 如果队列数量还多于一个,唤醒其它线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        // 返回头元素
        return x;
    }

分析到这里,我们就明白了线程池的创建过程:创建一个阻塞队列以容纳任务,在第一次执行任务时创建做够多的线程(不超过许可线程数),并处理任务,之后每个工作线程自行从任务对列中获得任务,直到任务队列中的任务数量为0为止,此时,线程将处于等待状态,一旦有任务再加入到队列中,即召唤醒工作线程进行处理,实现线程的可复用性。

使用线程池减少的是线程的创建和销毁时间,因为创建一个对象要获取内存资源,虚拟机跟踪每个对象,以便可以在对象销毁后进行垃圾回收,所以提高服务程序效率的一个重要手段就是尽可能的降低创建和销毁对象的次数,特别是一些非常耗资源的对象创建和销毁。
这对于多线程应用来说非常的有帮助,比如我们常用的servlet容器,每次请求处理的都是一个线程,如果不采用线程池技术,每次请求都会重新创建一个新的线程,这会导致系统的性能负荷加大,影响效率下降,主要是很low,弱爆了,这代码谁写的,给我出来,哈哈。

四、适时选择不同的线程池来实现

Java的线程池包括ThreadPoolExecutor类和ScheduledThreadPoolExecutor类。

为了简化,还提供了Exceutors的静态类,它可以直接生成不同的线程池执行器,比如单线程执行器、带缓冲功能的执行器等。

为了理解这些执行器,我们首先来看看ThreadPoolExecutor类,其中它复杂的构造函数可以很好的理解线程池的作用,代码如下:

// Java线程池的完整构造函数
public ThreadPoolExecutor(
  int corePoolSize, // 线程池长期维持的最小线程数,即使线程处于Idle状态,也不会回收。
  int maximumPoolSize, // 线程数的上限
  long keepAliveTime, // 线程最大生命周期。
  TimeUnit unit, //时间单位                                 
  BlockingQueue<Runnable> workQueue, //任务队列。当线程池中的线程都处于运行状态,而此时任务数量继续增加,则需要一个容器来容纳这些任务,这就是任务队列。
  ThreadFactory threadFactory, // 线程工厂。定义如何启动一个线程,可以设置线程名称,并且可以确认是否是后台线程等。
  RejectedExecutionHandler handler) // 拒绝任务处理器。由于超出线程数量和队列容量而对继续增加的任务进行处理的程序。

这是ThreadPoolExecutor最完整的构造函数,其他的构造函数都是引用该构造函数实现的。

线程池的管理是这样一个过程:首先创建线程池,然后根据任务的数量逐步将线程增大到corePoolSize数量,如果此时仍有任务增加,则放置到workQuene中,直到workQuene爆满为止,然后继续增加池中的数量(增强处理能力),最终达到maximumPoolSize,那如果此时还有任务增加进来呢?这就需要handler处理了,或者丢弃任务,或者拒绝新任务,或者挤占已有任务等。

在任务队列和线程池都饱和的情况下,一但有线程处于等待(任务处理完毕,没有新任务增加)状态的时间超过keepAliveTime,则该线程终止,也就说池中的线程数量会逐渐降低,直至为corePoolSize数量为止。

向线程池提交一个任务后,它的主要处理流程如下图所示:

5fc9cf0a48a4ad2da37b26856660a7b5205.jpg

我们可以把线程池想象为这样一个场景:在一个生产线上,车间规定是可以有corePoolSize数量的工人,但是生产线刚建立时,工作不多,不需要那么多的人。随着工作数量的增加,工人数量也逐渐增加,直至增加到corePoolSize数量为止。此时还有任务增加怎么办呢?

好办,任务排队,corePoolSize数量的工人不停歇的处理任务,新增加的任务按照一定的规则存放在仓库中(也就是我们的workQuene中),一旦任务增加的速度超过了工人处理的能力,也就是说仓库爆满时,车间就会继续招聘工人(也就是扩大线程数),直至工人数量到达maximumPoolSize为止,那如果所有的maximumPoolSize工人都在处理任务时,而且仓库也是饱和状态,新增任务该怎么处理呢?这就会扔一个叫handler的专门机构去处理了,它要么丢弃这些新增的任务,要么无视,要么替换掉别的任务。

过了一段时间后,任务的数量逐渐减少,导致一部分工人处于待工状态,为了减少开支(Java是为了减少系统的资源消耗),于是开始辞退工人,直至保持corePoolSize数量的工人为止,此时即使没有工作,也不再辞退工人(池中的线程数量不再减少),这也是保证以后再有任务时能够快速的处理。

明白了线程池的概念,我们再来看看Executors提供的几个线程创建线程池的便捷方法:

① newSingleThreadExecutor:单线程池。

顾名思义就是一个池中只有一个线程在运行,该线程永不超时,而且由于是一个线程,当有多个任务需要处理时,会将它们放置到一个无界阻塞队列中逐个处理,它的实现代码如下:  

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
             new LinkedBlockingQueue<Runnable()));
}

它的使用方法也很简单,下面是简单的示例:

public static void main(String[] args) throws ExecutionException,InterruptedException {
    // 创建单线程执行器
    ExecutorService es = Executors.newSingleThreadExecutor();
    // 执行一个任务
    Future<String> future = es.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            return "";
        }
    });
    // 获得任务执行后的返回值
    System.out.println("返回值:" + future.get());
    // 关闭执行器
    es.shutdown();
}

② newCachedThreadPool:缓冲功能的线程。

建立了一个线程池,而且线程数量是没有限制的(当然,不能超过Integer的最大值),新增一个任务即有一个线程处理,或者复用之前空闲的线程,或者重亲启动一个线程,但是一旦一个线程在60秒内一直处于等待状态时(也就是一分钟无事可做),则会被终止,其源码如下: 

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
}

这里需要说明的是,任务队列使用了同步阻塞队列,这意味着向队列中加入一个元素,即可唤醒一个线程(新创建的线程或复用空闲线程来处理),这种队列已经没有队列深度的概念了。

③ newFixedThreadPool:固定线程数量的线程池。

在初始化时已经决定了线程的最大数量,若任务添加的能力超出了线程的处理能力,则建立阻塞队列容纳多余的任务,其源码如下: 

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

上面返回的是一个ThreadPoolExecutor,它的corePoolSize和maximumPoolSize是相等的,也就是说,最大线程数量为nThreads。如果任务增长的速度非常快,超过了LinkedBlockingQuene的最大容量(Integer的最大值),那此时会如何处理呢?会按照ThreadPoolExecutor默认的拒绝策略(默认是DiscardPolicy,直接丢弃)来处理。

以上三种线程池执行器都是ThreadPoolExecutor的简化版,目的是帮助开发人员屏蔽过得线程细节,简化多线程开发。当需要运行异步任务时,可以直接通过Executors获得一个线程池,然后运行任务,不需要关注ThreadPoolExecutor的一系列参数时什么含义。当然,有时候这三个线程不能满足要求,此时则可以直接操作ThreadPoolExecutor来实现复杂的多线程计算。

newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是线程池的简化版,而ThreadPoolExecutor则是旗舰版___简化版容易操作,需要了解的知识相对少些,方便使用,而旗舰版功能齐全,适用面广,难以驾驭。

五、线程池的关闭

我们可以通过调用线程池的shutdown或shutdownNow方法来关闭线程池,但是它们的实现原理不同,shutdown的原理是只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程。shutdownNow的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow会首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。

只要调用了这两个关闭方法的其中一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。至于我们应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow。

六、线程池的好处

1、降低资源消耗

通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

2、提高响应速度

3、提高线程的可管理性

线程时稀缺资源,如果无限的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

七、线程池的应用范围

1、需要大量线程来完成任务,且完成任务的时间比较短

网页(http)请求这种任务,使用线程池技术是很合适的。由于单个任务小,而任务数量巨大,你能够想象一个热门站点的点击次数。 但对于长时间的任务,比方一个Telnet连接请求,线程池的长处就不明显了。由于Telnet会话时间比线程的创建时间大多了。

2、对性能要求苛刻的应用,比方要求server迅速响应客户请求。

3、接受突发性的大量请求,但不至于使server因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,尽管理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,并出现"OutOfMemory"的错误。

八、总结

线程池通过线程的复用减少了线程创建和销毁的开销,通过使用任务队列避免了线程的阻塞从而避免了线程调度和线程上下文切换的开销。

往期精彩内容:

Java知识体系总结(2021版)

Java多线程基础知识总结(绝对经典)

【全栈最全Java框架总结】SSH、SSM、Springboot

超详细的springBoot学习笔记

常见数据结构与算法整理总结

Java设计模式:23种设计模式全面解析(超级详细)

Java面试题总结(附答案)

版权声明:程序员胖胖胖虎阿 发表于 2022年10月6日 上午5:32。
转载请注明:【Java 多线程 3】线程池2 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...