线程池详解

2年前 (2022) 程序员胖胖胖虎阿
239 0 0

一、线程池的优势(为什么使用线程池)

1、降低系统资源消耗, 通过重用已存在的线程, 降低线程创建和销毁造成的消耗;
2、提高系统响应速度, 当有任务到达时, 无需等待新线程的创建便能立即执行;
3、方便线程并发数的管控, 线程若是无限制的创建, 不仅会额外消耗大量系统资源, 更是
占用过多资源而阻塞系统或内存不足等状况, 从而降低系统的稳定性。 线程池能有效管控线
程, 统一分配、 调优, 提供资源使用率;
4、更强大的功能, 线程池提供了定时、 定期以及可控线程数等功能的线程池, 使用方便简

二、线程池的执行流程

线程池详解
1、线程池判断核心线程池里的线程是否都在执行任务。如果不是,则创建一个新的工作
线程来执行任务。如果核心线程池里的线程都在执行任务,则进入下个流程。
2、线程池判断工作队列是否已经满。如果工作队列没有满,则将新提交的任务存储在这
个工作队列里。如果工作队列满了,则进入下个流程。
3、线程池判断线程池的线程是否都处于工作状态。如果没有,则创建一个新的工作线程
来执行任务。如果已经满了,则交给饱和策略来处理这个任务。

三、创建线程的方式以及线程池的核心参数

1、使用ThreadPoolExecutor创建

ExectorService service = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,milliseconds,runnableTaskQueue, handler);

创建线程池时的几个参数,如下。
(1)corePoolSize(线程池的基本大小):当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任
务数大于线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,
线程池会提前创建并启动所有基本线程。
(2)runnableTaskQueue(任务队列):用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列。

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按FIFO(先进先出)原则对元素进行排序。
  • ·LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO排序元素,吞吐量通常要高于ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
  • SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于Linked-BlockingQueue,静态工厂方法Executors.newCachedThreadPool使用了这个队列。
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

(3)maximumPoolSize(线程池最大数量):线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是,如果使用了无界的任务队列这个参数就没什么效果。

(4)ThreadFactory用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字。使用开源框架guava提供的ThreadFactoryBuilder可以快速给线程池里的线程设置有意义的名字,代码如下。new ThreadFactoryBuilder().setNameFormat(“XX-task-%d”).build();
(5)RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是AbortPolicy,表示无法处理新任务时抛出异常。在JDK 1.5中Java线程池框架提供了以下4种策略。

  • AbortPolicy:直接抛出异常
  • CallerRunsPolicy:只用调用者所在线程来运行任务。
  • DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
  • DiscardPolicy:不处理,丢弃掉。
  • 当然,也可以根据应用场景需要来实现RejectedExecutionHandler接口自定义策略。如记录日志或持久化存储不能处理的任务

(6)keepAliveTime(线程活动保持时间):线程池的工作线程空闲后,保持存活的时间。所以,如果任务很多,并且每个任务执行的时间比较短,可以调大时间,提高线程的利用率。
(7)TimeUnit(线程活动保持时间的单位):可选的单位有天(DAYS)、小时(HOURS)、分钟(MINUTES)、毫秒(MILLISECONDS)、微秒(MICROSECONDS,千分之一毫秒)和纳秒(NANOSECONDS,千分之一微秒)。

2、通过Executors的静态工厂方法创建线程池

(1)ThreadPoolExecutor

  • SingleThreadExecutor:单个线程,适用于保证顺序的执行各个任务,并且在任意时间点,不会有多个线程是活动的应用场景
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
  • FixedThreadExecutor:固定线程数大小适用于满足资源管理的需求,而需要限制当先线程数数量的应用场景,它适用与负载比较重的服务器
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

①如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
②在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入
LinkedBlockingQueue。
③线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

  • CachedThreadPool:大小无界的线程池,适用于执行很多的短期异步任务的小程序,或者负载较轻的服务器。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是无界的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
(2)ScheduledThreadPool

  • ScheduledThreadPoolExecutor:其中包含若干个线程,适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的。
    ScheduledFutureTask主要包含3个成员变量,如下。
    long型成员变量time,表示这个任务将要被执行的具体时间。
    long型成员变量sequenceNumber,表示这个任务被添加ScheduledThreadPoolExecutor中的序号。
    ·long型成员变量period,表示任务执行的间隔周期
    DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
    任务执行的步骤:
    1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务是指ScheduledFutureTask的time大于等于当前时间。
    2)线程1执行这个ScheduledFutureTask。
    3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
    4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())
  • SingleThreadScheduledExecutor.包含一个线程,适用于单个线程执行周期任务,同时需要保证顺序执行各个任务的应用场景。

四、像线程池提交任务和关闭线程池

1、向线程池提交任务

(1)execute()方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
(2)submit()方法用于提交需要返回值的任务。线程池会返回一个future类型的对象,通过这个future对象可以判断任务是否执行成功,并且可以通过future的get()方法来获取返回值,get()方法会阻塞当前线程直到任务完成,而使用get(long timeout,TimeUnit unit)方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。

2、关闭线程池

原理是遍历线
程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务
可能永远无法终止
(1)shutdownNow首先将线程池的状态设置成STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表
(2)shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程
只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务
都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true.通常调用shutdown方法来关闭
线程池,如果任务不一定要执行完,则可以调用shutdownNow方法

四、线程池的参数配置思路

线程池配置参数的思路:
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析。

  • 任务的性质:CPU密集型任务、IO密集型任务和混合型任务。
  • 任务的优先级:高、中和低
  • 任务的执行时间:长、中和短。
  • 任务的依赖性:是否依赖其他系统资源,如数据库连接。

性质不同的任务可以用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的
线程,如配置Ncpu+1个线程的线程池。由于IO密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如2*Ncpu。混合型的任务,如果可以拆分,将其拆分成一个CPU密集型任务和一个IO密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。如果这两个任务执行时间相差太大,则没必要进行分解。可以通过Runtime.getRuntime().availableProcessors()方法获得当前设备的CPU个数。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理。它可以让优先级高
的任务先执行。 如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。

执行时间不同的任务可以交给不同规模的线程池来处理,或者可以使用优先级队列,让
执行时间短的任务先执行。

依赖数据库连接池的任务,因为线程提交SQL后需要等待数据库返回结果,等待的时间越
长,则CPU空闲时间就越长,那么线程数应该设置得越大,这样才能更好地利用CPU。建议使用有界队列。有界队列能增加系统的稳定性和预警能力,可以根据需要设大一点儿,比如几千。系统里后台任务线程池的队列和线程池全满了,不断抛出抛弃任务的异常,通过排查发现是数据库出现了问题,导致执行SQL变得非常缓慢,因为后台任务线程池里的任务全是需要向数据库查询和插入数据的,所以导致线程池里的工作线程全部阻塞,任务积压在线程池里。如果当时我们设置成无界队列,那么线程池的队列就会越来越多,有可能会撑满内存,导致整个系统不可用,而不只是后台任务出现问题。当然,我们的系统所有的任务是用单独的服务器部署的,我们使用不同规模的线程池完成不同类型的任务,但是出现这样问题时也会影响到其他任务。
初次的参数配置
(1)当前的服务器只为当前的线程池服务,线程池能够占用服务器所有的资源,根据当前的是cpu密集型的还是IO密集型的,cpu密集型的话就可以选择与当前cpu核数相同或者大一点的。Io密集型的就可以选择2N,2N+1,2N+2,更具之前的理论基础和经验总结,进行第一轮压测,然后根据压测结果设置线程池参数
(2)分布式的微服务环境下,项目重有5个接口,2个重要接口,3个可以作为降级接口。保障其他俩个接口可以多的使用服务器的资源,一个重要接口里面需要创建一个线程池,提高单接口的处理速度,如何设置参数。不能单单看cpu核数:
①拿到线上实际生产高并发访问压力下该服务的所有接口的访问比例,如果一秒钟中有2000个请求过来,各个接口都要承载一定比例的请求,假如为2:3:1:1:1。其中一个重要接口的需要占用服务器资源的3/8,,这时可以确定最大线程数为服务器线程资源的3/8,当前服务器在峰值情况下到底能同时承受多少线程,需要参照cpu核树所有接口的qps,需要衡量当前服务器整体服务器的最大qps,以及当前服务器同一时间的正在执行的活跃线程数。当前活跃线程数就是我们当前面临的需求,最大访问量下的最大线程数活跃数,可以将其设置为当前服务器的最大线程数,就可以确定当前线程数的值。
②最大线程数确定后需要考虑核心线程数,核心线程数在初次设置时应该与最大线程数保持一致,因为要进行线程数最大并发访问能力的压测,看他能够否撑住当前访问量,可以撑住就可以调小核心线程数,撑不住可以进行设置queue的大小(无界队列,但无界队列会导致最大线程数参数失效,失去了线程池使用的意义,还有可以能会导致线程堆积,最终导致OOM)
③设置有界队列的参数,默认使用Integer.maxValue,高峰访问时段和压测进行匹配,访问时段多久压测就进行多久,为了业务扩充根据需求改变设置,时段的提出是为了最大线程数进行配置,计算当前接口一秒钟可以处理多少数据,计算整个时段这个接口可能处理多少数据,然后看实际生产数据,当前节点服务所承载的最大值,然后用整个时段进来的访问总量 一 整个时段接口所能处理的数据值。用剩余的值作为有界队列的上限值,在此基础上可以提高50%,解决访问不均匀的问题。如果访问非常均匀就可以适当降低冗余量,比如只在原来的基础上提高10%;如果是锯齿状的访问需求,需要讲queue的上限调整为锯齿状的最高点。
④最后进行压测,如果达到了需求就可以适当的减少的线程数,不要浪费无用的资源,涉及其他接口的降级,或者线程池配置不当引发的高可用性问题
⑤如果没有达到预期,如果已经给了最大的线程资源数,还是没有达到预期,一是再次提高queue的Limit上限存储看看还有多少没有处理,再次调大队列可以进行一个缓冲,如果业务能够满足,并且业务需求方同意的就可以。用空间内存置换当前接口的稳定运行,并且不影响
其他接口。二是queue不能设置太大,需要尽快处理,就需要考虑单机单接口问题,这时就需要调整代码,调整当前的并发性,db瓶颈,或者将当前接口调整为异步的,或者对消息的处理进行拆分,放到rabbiteMQ里面,由下游服务处理,然后从消息队列里面进行多线程的处理,来提高当前接口的高并发处理能力,去处理完目前要求的请求。三是可以适当的调大一点点当前接口线程数,不能以当前接口的利益为中心而毁坏了其他接口持有服务器资源的权力。因为其他接口也承受着高并发的请求。因为在非常高的请求下可降级接口已经降级,会将线程资源空出来,所以重要接口可以适当的调高一点点线程数,但是十分不建议。最好的处理办法是增加节点,而不是增加线程数引起当前其他的接口的问题。
⑥时间参数keepalive 看有没有触及到最大线程数,没有触及选择默认,如果触及到且时间很长,可已将时间设置长一点确保最大线程数不会影响到其他服务接口。

如果增大了当前重要接口的线程数会引起什么问题:
①如果当前线程池参数设置的国过小,会导致队列满了,最大线程数也处理不了,到了最大饱和策略,如过饱和策略是抛出异常,会导致当前接口异常比例升高,如果当前接口的异常比例过高就会导致当前接口熔断或者降级,本来服务器资源足够用但是由于线程设置过小导致当前接口异常熔断或者降级。
②如果线程数参数设置过大,会导致其他接口调用超时或者调用结果不能及时返回导致更多请求积压进而导致系统崩溃。
总结:先确定最大线程数,然后核心线程数和最大线程数保持一致,进行压测,queue用生产上时段最高的访问量 一 单接口在时间段能够处理的最大数量,设置一定的容量进行压测,如果可以达到减小queue大小,达不到要么增加queue容量然后慢慢处理,要么优化接口的处理速度,消费快一点,或者增加节点和机器。线程池配置完全基于压测,压测需要找到一个指标,需要一个依据进行压测。

使用无界队列作为工作队列会对线程池带来如下影响
1)当线程池中的线程数达到corePoolSize后,新任务将在无界队列中等待,因此线程池中
的线程数不会超过corePoolSize。
2)由于1,使用无界队列时maximumPoolSize将是一个无效参数。
3)由于1和2,使用无界队列时keepAliveTime将是一个无效参数。
4)由于使用无界队列,运行中的FixedThreadPool(未执行方法shutdown()或
shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

版权声明:程序员胖胖胖虎阿 发表于 2022年11月8日 下午9:16。
转载请注明:线程池详解 | 胖虎的工具箱-编程导航

相关文章

暂无评论

暂无评论...