springboot线程池监控的简单实现

网友投稿 1277 2022-11-03

springboot线程池监控的简单实现

springboot线程池监控的简单实现

目录背景代码代码类结构线程池扩展类线程工具类线程bean类线程池实现类线程池监控接口类运行结果

背景

在我们实际项目开发中,常常会为不同的优先级的任务设置相对应的线程池。一般我们只关注相关池的相关参数如核心线程数据,最大线程数据等等参数,容易忽略了对线程池中实际运行情况的监控。综上所述:线程池如果相当于黑盒一样在运行的话,对系统的不利的。本文提供了一种简单获取线程池运行状态的方式,可以将详情打印到日志或者对接到Prometheus上进行展示。详细有不少博主给出了动态修改线程的方式,但是由于生产环境是禁止,因此本文只提供了监控的功能。本代码应用项目架构为springboot。

代码

代码类结构

ThreadPoolMonitor:线程池扩展类ThreadPoolUtil:线程池工具类ThreadPoolDetailInfo:bean类ExecutorThreadPoolManager:线程池实现类ThreadPoolController:线程池测试方法

线程池扩展类

从类主要重写了ThreadPoolExecutor类中的shutdown/shutdownNow/beforeExecute/afterExecute,用于对每个任务进行执行前后的拦截,计算出每个任务的运行时间。

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import java.util.Date;

import java.util.List;

import java.util.concurrent.*;

/**

* @ClassName ThreadPoolMonitor

* @authors kantlin

* @Date 2021/12/16 17:45

**/

public class ThreadPoolMonitor extends ThreadPoolExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);

private final ConcurrentHashMap startTimes;

private final String poolName;

private long totalDiff;

public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, String poolName) {

super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);

this.startTimes = new ConcurrentHashMap();

this.poolName = poolName;

}

@Override

public void shutdown() {

LOGGER.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});

super.shutdown();

}

@Override

public List shutdownNow() {

LOGGER.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}", new Object[]{this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()});

return super.shutdownNow();

}

@Override

protected void beforeExecute(Thread t, Runnable r) {

this.startTimes.put(String.valueOf(r.hashCode()), new Date());

}

@Override

protected void afterExecute(Runnable r, Throwable t) {

Date startDate = this.startTimes.remove(String.valueOf(r.hashCode()));

Date finishDate = new Date();

long diff = finishDate.getTime() - startDate.getTime();

this.totalDiff += diff;

}

public long getTotalDiff() {

return this.totalDiff;

}

}

线程工具类

import org.springframework.stereotype.Component;

import java.util.HashMap;

import java.util.concurrent.BlockingQueue;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.TimeUnit;

/**

* @ClassName ThreadPoolUtil

* @authors kantlin

* @Date 2021/12/16 17:45

**/

@Component

public class ThreadPoolUtil {

private final HashMap threadPoolExecutorHashMap = new HashMap();

public ThreadPoolUtil() {

}

public ThreadPoolMonitor creatThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory,String poolName) {

ThreadPoolMonitor threadPoolExecutor = new ThreadPoolMonitor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, poolName);

this.threadPoolExecutorHashMap.put(poolName, threadPoolExecutor);

return threadPoolExecutor;

}

public HashMap getThreadPoolExecutorHashMap() {

return this.threadPoolExecutorHashMap;

}

线程bean类

import lombok.Data;

@Data

public class ThreadPoolDetailInfo {

//线程池名字

private String threadPoolName;

//当前线程池大小

private Integer poolSize;

//线程池核心线程数量

private Integer corePoolSize;

//线程池生命周期中最大线程数量

private Integer largestPoolSize;

//线程池中允许的最大线程数

private Integer maximumPoolSize;

//线程池完成的任务数目

private long completedTaskCount;

//线程池中当前活跃个数

private Integer active;

//线程池完成的任务个数

private long task;

//线程最大空闲时间

private long keepAliveTime;

//当前活跃线程的占比

private int activePercent;

//任务队列容量(阻塞队列)

private Integer queueCapacity;

//当前队列中任务的数量

private Integer queueSize;

//线程池中任务平均执行时长

private long avgExecuteTime;

public ThreadPoolDetailInfo(String threadPoolName, Integer poolSize, Integer corePoolSize, Integer largestPoolSize, Integer maximumPoolSize, long completedTaskCount, Integer active, long task, long keepAliveTime, int activePercent, Integer queueCapacity, Integer queueSize, long avgExecuteTime) {

this.threadPoolName = threadPoolName;

this.poolSize = poolSize;

this.corePoolSize = corePoolSize;

this.largestPoolSize = largestPoolSize;

this.maximumPoolSize = maximumPoolSize;

this.completedTaskCount = completedTaskCount;

this.active = active;

this.task = task;

this.keepAliveTime = keepAliveTime;

this.activePercent = activePercent;

this.queueCapacity = queueCapacity;

this.queueSize = queueSize;

this.avgExecuteTime = avgExecuteTime;

}

}

线程池实现类

在我的项目中,将线程池依次划分为high、normal、low、single四种线程池类型。不同优先级的任务将会被submit到不同的线程池中执行。在业务中有判断线程池中queue的长度来决定是否投递任务,由于没有相应的拒绝策略,所以队列不设置长度。

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import com.*.newThread.ThreadPoolUtil;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.beans.factory.annotation.Value;

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadFactory;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

@Component

public class ExecutorThreadPoolMMRTxnqHNAyanager {

@Autowired

private ThreadPoolUtil threadPoolUtil;

@Value("${thread_pool_normal_level_thread_max_num}")

private Integer normalLevelThreadPoolThreadMaxNum;

@Value("${thread_pool_normal_level_core_thread_num}")

private Integer normalLevelThreadPoolCoreThreadNum;

@Value("${thread_pool_low_level_thread_max_num}")

private Integer lowLevelThreadPoolThreadMaxNum;

@Value("${thread_pool_low_level_core_thread_num}")

private Integer lowLevelThreadPoolCoreThreadNum;

private ThreadPoolExecutor normalThreadPoolExecutor;

private ThreadPoolExecutor highPriorityExecutor;

private ThreadPoolExecutor lowPriorityExecutor;

private ThreadPoolExecutor singleThreadPoolExecutor;

@PostConstruct

public void initExecutor() {

ThreadFactory normalThreadFactory = new ThreadFactoryBuilder().setNameFormat("normal_task_thread_%d").build();

normalThreadPoolExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), normalThreadFactory,"normal_level_thread_pool");

ThreadFactory highPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("high_priority_level_task_thread_%d").build();

highPriorityExecutor = threadPoolUtil.creatThreadPool(normalLevelThreadPoolCoreThreadNum, normalLevelThreadPoolThreadMaxNum, 0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), highPriorityThreadFactory,"high_level_thread_pool");

ThreadFactory lowPriorityThreadFactory = new ThreadFactoryBuilder().setNameFormat("low_priority_level_task_thread_%d").build();

lowPriorityExecutor = threadPoolUtil.creatThreadPool(lowLevelThreadPoolCoreThreadNum, lowLevelThreadPoolThreadMaxNum, 0L,

TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), lowPriorityThreadFactory,"low_level_thread_pool");

ThreadFactory singleFactory = new ThreadFactoryBuilder().setNameFormat("single_task_thread_%d").build();

singleThreadPoolExecutor =threadPoolUtil.creatThreadPool(1, 1,

0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), singleFactory,"single_level_thread_pool");

}

/**

* @author kantlin

* @date 2021/9/9

* @describe 定义三种线程池, 一般采集类的用低优, 正常业务的用中优, 用户手动请求API的用高优线程池

**/

public ThreadPoolExecutor getNormalThreadPoolExecutor() {

return normalThreadPoolExecutor;

}

public ThreadPoolExecutor getHighPriorityExecutor() {

return highPriorityExecutor;

}

public ThreadPoolExecutor getLowPriorityExecutor() {

return lowPriorityExecutor;

}

public ThreadPoolExecutor getSingleThreadPoolExecutor() {

return singleThreadPoolExecutor;

}

}

线程池监控接口类

import com.alibaba.fastjson.JSONObject;

import com.*.newThread.ThreadPoolDetailInfo;

import com.*.newThread.ThreadPoolMonitor;

import com.*.newThread.ThreadPoolUtil;

import com.*.thread.ExecutorThreadPoolManager;

import io.swagger.annotations.Api;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.scheduling.annotation.Scheduled;

import org.springframework.web.bind.annotation.*;

import java.math.BigDecimal;

import java.text.NumberFormat;

import java.util.*;

import java.util.concurrent.TimeUnit;

/**

* @ClassName ThreadPoolController

* @authors kantlin

* @Date 2021/12/17 14:53

**/

@Api(description = "线程池监控接口")

@RestController

@RequestMapping(value = "api/threadpool")

public class ThreadPoolController {

private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolController.class);

@Autowired

private ExecutorThreadPoolManager threadPool;

@Autowired

private ThreadPoolUtil threadPoolUtil;

@GetMapping(value = "/getThreadPools")

private List getThreadPools() {

List threadPools = new ArrayList();

if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {

Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();

while (var2.hasNext()) {

Map.Entry entry = (Map.Entry) var2.next();

threadPools.add(entry.getKey());

}

}

return threadPools;

}

@GetMapping(value = "/getThreadPoolListInfo")

@Scheduled(cron = "${thread.poll.status.cron}")

private List getThreadPoolListInfo() {

List detailInfoList = new ArrayList();

if (!this.threadPoolUtil.getThreadPoolExecutorHashMap().isEmpty()) {

Iterator var2 = this.threadPoolUtil.getThreadPoolExecutorHashMap().entrySet().iterator();

while (var2.hasNext()) {

Map.Entry entry = (Map.Entry) var2.next();

ThreadPoolDetailInfo threadPoolDetailInfo = this.threadPoolInfo(entry.getValue(), (String) entry.getKey());

detailInfoList.add(threadPoolDetailInfo);

}

}

LOGGER.info("Execute details of cuurent thread poll:{}", JSONObject.toJSONString(detailInfoList));

return detailInfoList;

}

private ThreadPoolDetailInfo threadPoolInfo(ThreadPoolMonitor threadPool, String threadPoolName) {

BigDecimal activeCount = new BigDecimal(threadPool.getActiveCount());

BigDecimal maximumPoolSize = new BigDecimal(threadPool.getMaximumPoolSize());

BigDecimal result = activeCount.divide(maximumPoolSize, 2, 4);

NumberFormat numberFormat = NumberFormat.getPercentInstance();

numberFormat.setMaximumFractionDigits(2);

int queueCapacity = 0;

return new ThreadPoolDetailInfo(threadPoolName, threadPool.getPoolSize(), threadPool.getCorePoolSize(), threadPool.getLargestPoolSize(), threadPool.getMaximumPoolSize(), threadPool.getCompletedTaskCount(), threadPool.getActiveCount(), threadPool.getTaskCount(), threadPool.getKeepAliveTime(TimeUnit.MILLISECONDS), new Double(result.doubleValue() * 100).intValue(), queueCapacity, threadPool.getQueue().size(), threadPool.getTaskCount() == 0L ? 0L : threadPool.getTotalDiff() / threadPool.getTaskCount());

}

}

运行结果

上面controller中的方法除了可以通过接口进行暴露外,还设置了定时任务定期的打印到日志中。方便对系统状态进行排查。

[

{

"active": 0,

"activePercent": 0,

"avgExecuteTime": 0,

"completedTaskCount": 0,

"corePoolSize": 20,

"keepAliveTime": 0,

"largestPoolSize": 0,

"maximumPoolSize": 20,

"poolSize": 0,

"queueCapacity": 0,

"queueSize": 0,

"task": 0,

"threadPoolName": "high_level_thread_pool"

},

{

"active": 0,

"activePercent": 0,

"avgExecuteTime": 0,

"completedTaskCount": 0,

"corePoolSize": 33,

"keepAliveTime": 0,

"largestPoolSize": 0,

"maximumPoolSize": 33,

"poolSize": 0,

"queueCapacity": 0,

"queueSize": 0,

"task": 0,

"threadPoolName": "low_level_thread_pool"

},

{

"active": 0,

"activePercent": 0,

"avgExecuteTime": 371,

"completedTaskCount": 14,

"corePoolSize": 20,

http://"keepAliveTime": 0,

"largestPoolSize": 14,

"maximumPoolSize": 20,

"poolSize": 14,

"queueCapacity": 0,

"queueSize": 0,

"task": 14,

"threadPoolName": "normal_level_thread_pool"

},

{

"active": 0,

"activePercent": 0,

"avgExecuteTime": 0,

"completedTaskCount": 0,

"corePoolSize": 1,

"keepAliveTime": 0,

"largestPoolSize": 0,

"maximumPoolSize": 1,

"poolSize": 0,

"queueCapacity": 0,

"queueSize": 0,

"task": 0,

"threadPoolName": "single_level_thread_pool"

}

]

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:OpenTrip是一个轻量级的旅游电商平台网站,基于ThinkPHP框架。
下一篇:easy_store-是 web2py 模型的电子商务/其它存储平台的支持框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~