探索flutter框架开发的app在移动应用市场的潜力与挑战
1028
2022-10-07
Dubbo 源码分析之线程池
文章目录
前言Dubbo 线程池分类
Dubbo 线程工厂线程池拒绝策略CachedThreadPoolFixedThreadPoolLimitedThreadPoolEagerThreadPool
前言
相信学过 netty 的都知道,每一个 ChannelHandler 都是通过它的 EventLoop(I/O线程) 来处理传递给它的事件,所以至关重要的是不要阻塞这个线程,因为这会对整体的 I/O 产生负面的影响。Dubbo 亦是如此,所以 Dubbo 定义了一些线程池,来供我们异步处理事件。
Dubbo 线程池分类
在 Dubbo-2.6.5 版本中共有三个线程池
CachedThreadPoolFixedThreadPoolLimitedThreadPool
而在 GitHub 上 Dubbo 的最新源码中又提供了一个新的线程池
EagerThreadPool
是不是很熟悉,除了 EagerThreadPool,别的线程池好像 java.util.concurrent.Executors 都为我们提供了。只不过 Dubbo 自己又实现了一下,调整下自定义参数(对线程池不熟悉的请查看为这篇文章:JAVA线程池ThreadPoolExecutor详解)
Dubbo 线程工厂
在看线程池之前先看一下 Dubbo 自定义的线程池工厂
public class NamedThreadFactory implements ThreadFactory { // 自增 线程池序号 protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); // 线程池中 线程自增序号 protected final AtomicInteger mThreadNum = new AtomicInteger(1); // 线程池名称前缀 protected final String mPrefix; // 是否设置为守护线程 protected final boolean mDaemon; // 所在线程组 protected final ThreadGroup mGroup; // 默认非守护线程 即:用户线程 public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemon) { mPrefix = prefix + "-thread-"; mDaemon = daemon; SecurityManager s = System.getSecurityManager(); mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } public ThreadGroup getThreadGroup() { return mGroup; }}
该线程工厂为我们的线程池创建了名称,为线程创建了名称,方便我们查看堆栈信息时进行 debug ,线程为守护/用户的决定权由使用者决定。 上面的线程工厂是在 Dubbo-2.6.5 版本中的源码 其实在 GitHub 上 Dubbo 的最新源码中为该线程工厂又封装了一层
public class NamedInternalThreadFactory extends NamedThreadFactory { public NamedInternalThreadFactory() { super(); } public NamedInternalThreadFactory(String prefix) { super(prefix, false); } public NamedInternalThreadFactory(String prefix, boolean daemon) { super(prefix, daemon); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); InternalThread ret = new InternalThread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; }}
看着好像并没有做什么,其实细节都在 newThread 方法里,会发现我们在父类 NamedThreadFactory 中是直接使用的 new Thread ,而在这里使用的是 new InternalThread,有什么区别呢?仔细看看 InternalThread 也就是重新封装了一下 Thread ,好像什么都没做。其实作用这这里是看不出来的,看过 Netty 源码的可能知道 Netty 有个 FastThreadLocal FastThreadLocalThread,Dubbo 在这里也是如此,借鉴了 Netty 源码。要说 FastThreadLocal 和 ThreadLocal 什么区别,请移步我的这篇文章Java源码之ThreadLocal,在这里我就不多废话了。
线程池拒绝策略
Dubbo 也重写了线程池拒绝策略,主要操作就是在线程池拒绝时保存堆栈信息到文件。具体可以看我分析的这篇文章 Dubbo 源码分析之自定义线程池拒绝策略,这里不再详细介绍。
下面开始看线程池
CachedThreadPool
public class CachedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { //线程池名称前缀 默认:Dubbo String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); //核心线程数 默认:0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数 默认:Integer.MAX_VALUE int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); //阻塞队列大小 默认:0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); //线程存活时间 默认:60s int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue
所谓 CachedThreadPool,就是线程池会伸缩,当事件较多时创建新的线程,事件较少时,超出一定时间回收线程。
注意:其实此 CachedThreadPool 并不是正经的 cache,可能大家发现了该线程池的阻塞队列虽然默认是 SynchronousQueue,但是如果用户配置了 queues 变量,且其值较大,使用的阻塞队列就是 LinkedBlockingQueue,此时一旦 corethreads 再使用默认值0,就会导致处理事件阻塞。
FixedThreadPool
public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue
FixedThreadPool 表示核心线程和最大线程大小都为固定值,在 Dubbo 中的确也是这么处理的,但是这里还有要注意的地方。
注意:如果使用 fixed 线程池,默认的线程池数量为200,默认阻塞队列大小为0,默认使用的阻塞队列为SynchronousQueue。如果你的业务事件并发较高或者处理时间较长,请适当调整阻塞队列大小,即 queues 变量,否则会导致大量请求被丢弃。该线程池也是 Dubbo 默认使用的线程池,估计出事的挺多。
LimitedThreadPool
public class LimitedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue
这个线程池就比较有意思的,该线程池中的线程数可以一直增长到上限,永不回收,其实永不回收是个伪概念,查看代码可以发现,该线程设置线程的回收时间限制为 Long.MAX_VALUE,可以理解为永久不回收。这个如果说要注意的那就只有一点,因为线程池是不回收的,所以 threads 变量即线程最大限制的值不能太大,使用默认 200 即可,避免 OOM.
EagerThreadPool
注:此线程池在dubbo-2.6.5及其以下没有实现
首先介绍一下该线程池:线程池中的所有核心线程都在忙碌时,此时如果再添加新的任务不会放入阻塞队列,而且创建新的线程,直到达到最大线程限制,此时如果还有任务,才会放入阻塞队列。
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // init queue and executor TaskQueue
大致规规矩矩,和上面一样,里面好像有两个我们没有见过的类 TaskQueue 和 EagerThreadPoolExecutor 。
先看一下自定义阻塞队列 TaskQueue
public class TaskQueue
最主要的逻辑就是在重写的 offer 方法里面,通过重写该方法可以在我们向线程池提交任务时,伪装阻塞队列已满,来使线程池新建线程执行任务。
贴上一段 ThreadPoolExecutor 类的 execute 代码
public void execute(Runnable command) { /*如果提交的任务为null 抛出空指针异常*/ if (command == null) throw new NullPointerException(); int c = ctl.get(); /*如果当前的任务数小于等于设置的核心线程大小,那么调用addWorker直接执行该任务*/ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /*如果当前的任务数大于设置的核心线程大小,而且当前的线程池状态时运行状态,那么向阻塞队列中添加任务*/ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /*如果向队列中添加失败,那么就新开启一个线程来执行该任务*/ //tip:最主要的就是这里 当向阻塞队列插入失败时,会直接调用 addWorker 方法,创建新的线程执行任务 else if (!addWorker(command, false)) reject(command); }
想具体了解细节的可以查看我的这篇博客 线程池(ThreadPoolExecutor)源码分析之如何保证核心线程不被销毁的。 通过 ThreadPoolExecutor 源码可以发现,当向阻塞队列插入失败时,会直接调用 addWorker 方法,创建新的线程执行任务。现在就和重写的 TaskQueue 的 offer 方法对上了。
那就继续看 EagerThreadPoolExecutor 类。
// 继承ThreadPoolExecutorpublic class EagerThreadPoolExecutor extends ThreadPoolExecutor { // 任务数量 private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue
重写的线程池也很简单,提供一个当前任务数量的接口,供 TaskQueue 使用。execute 方法也很好理解。可能有的小伙伴要问了,为什么要在 execute 方法中捕获 RejectedExecutionException 异常呢。这个还要回到 ThreadPoolExecutor 源码中
//省略无关代码 else if (!addWorker(command, false)) reject(command);//省略无关代码
这句代码,在 addWorker 失败时,会抛出 RejectedExecutionException 异常。在未重写的线程池中抛出该异常的条件是:阻塞队列已满,线程数已达到最大限制。重写后的 TaskQueue 阻塞队列中 offer 还会在线程池中线程数量小于最大限制时返回 false.即使返回 false 新的阻塞队列也不代表已经满了,所以需要 retryOffer
//省略无关代码 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; }//省略无关代码
线程池分析结束了,看了下提交记录 EagerThreadPool 好像还是当初我们公司的前师兄写的。目前好像是 Dubbo 的 PPMC。厉害的很啊,大家一起努力学习啊。coding
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~