基于ThreadPoolTaskExecutor的使用说明

网友投稿 1249 2022-11-26

基于ThreadPoolTaskExecutor的使用说明

基于ThreadPoolTaskExecutor的使用说明

目录ThreadPoolTaskExecutor的使用springboot 配置提交任务ThreadPoolTaskExecutor配置问题有关spring中ThreadPoolTaskExecutor具体如下回忆一下线程池工作原理测试场景1测试场景2

ThreadPoolTaskExecutor的使用

当我们需要实现并发、异步等操作时,通常都会使用到ThreadPoolTaskExecutor,现对其使用稍作总结。

springboot 配置

提交任务

无返回值的任务使用execute(Runnable)

有返回值的任务使用submit(Runnable)

处理流程

当一个任务被提交到线程池时,首先查看线程池的核心线程是否都在执行任务,否就选择一条线程执行任务,是就执行第二步。

查看核心线程池是否已满,不满就创建一条线程执行任务,否则执行第三步。

查看任务队列是否已满,不满就将任务存储在任务队列中,否则执行第四步。

查看线程池是否已满,不满就创建一条线程执行任务,否则就按照策略处理无法执行的任务。

在ThreadPoolExecutor中表现为:

如果当前运行的线程数小于corePoolSize,那么就创建线程来执行任务(执行时需要获取全局锁)。

如果运行的线程大于或等于corePoolSize,那么就把task加入BlockQueue。

如果创建的线程数量大于BlockQueue的最大容量,那么创建新线程来执行该任务。

如果创建线程导致当前运行的线程数超过maximumPoolSize,就根据饱和策略来拒绝该任务。

关闭线程池

调用shutdown或者shutdownNow,两者都不会接受新的任务,而且通过调用要停止线程的interrupt方法来中断线程,有可能线程永远不会被中断,不同之处在于shutdownNow会首先将线程池的状态设置为STOP,然后尝试停止所有线程(有可能导致部分任务没有执行完)然后返回未执行任务的列表。而shutdown则只是将线程池的状态设置为shutdown,然后中断所有没有执行任务的线程,并将剩余的任务执行完。

配置线程个数

如果是CPU密集型任务,那么线程池的线程个数应该尽量少一些,一般为CPU的个数+1条线程。

如果是IO密集型任务,那么线程池的线程可以放的很大,如2*CPU的个数。

对于混合型任务,如果可以拆分的话,通过拆分成CPU密集型和IO密集型两种来提高执行效率;如果不能拆分的的话就可以根据实际情况来调整线程池中线程的个数。

监控线程池状态

常用状态

taskCount:线程需要执行的任务个数。

completedTaskCount:线程池在运行过程中已完成的任务数。

largestPoolSize:线程池曾经创建过的最大线程数量。

getPoolSize:获取当前线程池的线程数量。

getActiveCount:获取活动的线程的数量

通过继承线程池,重写beforeExecute,afterExecute和terminated方法来在线程执行任务前,线程执行任务结束,和线程终结前获取线程的运行情况,根据具体情况调整线程池的线程数量。

ThreadPoolTaskExecutor配置问题

最近线上出现一个奇葩问题,使用的是ThreadPoolTaskExecutor来处理后续服务调用,刚开始运行ThreadPoolTaskExecutor处理后续服务调用是没有问http://题的,但是一段时间之后,发现后续服务一直没有被调用,导致了极其严重的后果

有关spring中ThreadPoolTaskExecutor具体如下

class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">

那就不得不了解一下java.util.concurrent包下Executor构架了

回忆一下线程池工作原理

如果当前运行的线程少于corePoolSize,则创建新线程来执行任务(需要获得全局锁)

如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue

如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务(需要获得全局锁)

如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用

RejectedExecutionHandler.rejectedExecution()方法

测试场景1

首先,注释queueCapacity的一行

任务:

public class CustomRunnable implements Runnable {

private int id;

public CustomRunnable(int id) {

this.id = id;

}

@Override

public void run() {

try {

System.out.println("begin execute "+ Thread.currentThread().getName()

+ "-- task id: "+ id);

String rs = ClientUtil.get("http://****.com");

System.out.println("end execute task: "+ id);

} catch (Exception e) {

e.printStackTrace();

}

}

}

测试案例:

@Test

public void threadTest() throws InterruptedException {

for (int i=0; i< 35; i++){

Thread t= new Thread(new CustomRunnable(i));

executor.execute(t);

}

Thread.sleep(1800000);

}

测试结果:

七月 09, 2018 5:46:47 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize

信息: Initializing ExecutorService 'threadPoolTaskExecutor'

begin execute threadPoolTaskExecutor-1-- task id: 0

begin execute threadPoolTaskExecutor-2-- task id: 1

begin execute threadPoolTaskExecutor-3-- task id: 2

begin execute threadPoolTaskExecutor-4-- task id: 3

begin execute threadPoolTaskExecutor-5-- task id: 4

end execute task: 4

begin execute threadPoolTaskExecutor-5-- task id: 5

end execute task: 1

begin execute threadPoolTaskExecutor-2-- task id: 6

end execute task: 0

begin execute threadPoolTaskExecutor-1-- task id: 7

end execute task: 2

begin execute threadPoolTaskExecutor-3-- task id: 8

end execute task: 3

begin execute threadPoolTaskExecutor-4-- task id: 9

...

可以发现,一开始线程池就创建了corePoolSize大小的线程,对于之后的新加进的任务,就放到BlockingQueue中,默认是使用LinkedBlockingQueue,大小是Integer.MAX_VALUE,因为队列大小太大,所以就不会创建maxPoolSize大小的线程数量,因此,只有线程处理完当前任务,才会去处理下一个任务,所以,刚加进去的任务得不到立即处理

测试场景2

只需要打开queueCapacity的一行,其他不变

测试结果:

七月 09, 2018 6:07:13 下午 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor initialize

信息: Initializing ExecutorService 'threadPoolTaskExecutor'

begin execute threadPoolTaskExecutor-1-- task id: 0

begin execute threadPoolTaskExecutor-2-- task id: 1

begin execute threadPoolTaskExecutor-3-- task id: 2

begin execute threadPoolTaskExecutor-4-- task id: 3

begin execute threadPoolTaskExecutor-5-- task id: 4

begin execute threadPoolTaskExecutor-6-- task id: 15

begin execute threadPoolTaskExecutor-7http://-- task id: 16

begin execute threadPoolTaskExecutor-8-- task id: 17

begin execute threadPoolTaskExecutor-9-- task id: 18

begin execute threadPoolTaskExecutor-10-- task id: 19

begin execute threadPoolTaskExecutor-11-- task id: 20

begin execute threadPoolTaskExecutor-12-- task id: 21

begin execute threadPoolTaskExecutor-14-- task id: 23

begin execute threadPoolTaskExecutor-15-- task id: 24

begin execute main-- task id: 26

begin execute threadPoolTaskExecutor-13-- task id: 22

begin execute threadPoolTaskExecutor-16-- task id: 25

begin execute threadPoolTaskExecutor-11-- task id: 5

end execute task: 15

begin execute threadPoolTaskExecutor-6-- task id: 6

end execute task: 23

begin execute threadPoolTaskExecutor-14-- task id: 7

end execute task: 4

begin execute threadPoolTaskExecutor-5-- task id: 8

end execute task: 17

begin execute threadPoolTaskExecutor-8-- task id: 9

....

可以发现,因为初始任务数量大于corePoolSize大小,所以线程池初始化就创建了maxPoolSize大小数量的纯种,对于后续新加进的任务会入到BlockingQueue队列中去,之后等待线程处理完一个任务之后再处理队列中的任务

猜想

线上出现这种原因可能就是因为queueCapacity被设置成了默认(Integer.MAX_VALUE),而且初始化纯种的corePoolSize数量过少,并且线程处理速度较慢(业务逻辑,网络请求等等原因),导致后续任务会一直填加到队列中去,迟迟得不到立即处理。

解决方案

手动设置queueCapacity大小,网络请求原因的话,可以设置超时时间;业务逻辑的话,另辟蹊径。。。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Python flask实战订餐系统微信小程序-23实现登录和修改账号功能
下一篇:Python轻松入门-10 注释和break语句
相关文章

 发表评论

暂时没有评论,来抢沙发吧~