Dubbo 源码分析之线程池

网友投稿 1028 2022-10-07

Dubbo 源码分析之线程池

Dubbo 源码分析之线程池

文章目录

​​前言​​​​Dubbo 线程池分类​​

​​Dubbo 线程工厂​​​​线程池拒绝策略​​​​CachedThreadPool​​​​FixedThreadPool​​​​LimitedThreadPool​​​​EagerThreadPool​​

前言

相信学过 ​​netty​​​ 的都知道,每一个 ​​ChannelHandler​​​ 都是通过它的 ​​EventLoop(I/O线程)​​​ 来处理传递给它的事件,所以至关重要的是不要阻塞这个线程,因为这会对整体的 ​​I/O​​​ 产生负面的影响。​​Dubbo​​​ 亦是如此,所以 ​​Dubbo​​ 定义了一些线程池,来供我们异步处理事件。

Dubbo 线程池分类

在 ​​Dubbo-2.6.5​​ 版本中共有三个线程池

​​CachedThreadPool​​​​FixedThreadPool​​​​LimitedThreadPool​​

而在 GitHub 上 ​​Dubbo​​ 的最新源码中又提供了一个新的线程池

​​EagerThreadPool​​

是不是很熟悉,除了 ​​EagerThreadPool​​​,别的线程池好像 ​​java.util.concurrent.Executors​​​ 都为我们提供了。只不过 ​​Dubbo​​​ 自己又实现了一下,调整下自定义参数(对线程池不熟悉的请查看为这篇文章:​​JAVA线程池ThreadPoolExecutor详解​​)

Dubbo 线程工厂

在看线程池之前先看一下 ​​Dubbo​​ 自定义的线程池工厂

public class NamedThreadFactory implements ThreadFactory { // 自增 线程池序号 protected static final AtomicInteger POOL_SEQ = new AtomicInteger(1); // 线程池中 线程自增序号 protected final AtomicInteger mThreadNum = new AtomicInteger(1); // 线程池名称前缀 protected final String mPrefix; // 是否设置为守护线程 protected final boolean mDaemon; // 所在线程组 protected final ThreadGroup mGroup; // 默认非守护线程 即:用户线程 public NamedThreadFactory() { this("pool-" + POOL_SEQ.getAndIncrement(), false); } public NamedThreadFactory(String prefix) { this(prefix, false); } public NamedThreadFactory(String prefix, boolean daemon) { mPrefix = prefix + "-thread-"; mDaemon = daemon; SecurityManager s = System.getSecurityManager(); mGroup = (s == null) ? Thread.currentThread().getThreadGroup() : s.getThreadGroup(); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); Thread ret = new Thread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; } public ThreadGroup getThreadGroup() { return mGroup; }}

该线程工厂为我们的线程池创建了名称,为线程创建了名称,方便我们查看堆栈信息时进行 ​​debug​​​ ,线程为守护/用户的决定权由使用者决定。 上面的线程工厂是在 ​​​Dubbo-2.6.5​​​ 版本中的源码 其实在 GitHub 上 ​​​Dubbo​​ 的最新源码中为该线程工厂又封装了一层

public class NamedInternalThreadFactory extends NamedThreadFactory { public NamedInternalThreadFactory() { super(); } public NamedInternalThreadFactory(String prefix) { super(prefix, false); } public NamedInternalThreadFactory(String prefix, boolean daemon) { super(prefix, daemon); } @Override public Thread newThread(Runnable runnable) { String name = mPrefix + mThreadNum.getAndIncrement(); InternalThread ret = new InternalThread(mGroup, runnable, name, 0); ret.setDaemon(mDaemon); return ret; }}

看着好像并没有做什么,其实细节都在 ​​newThread​​​ 方法里,会发现我们在父类 ​​NamedThreadFactory​​​ 中是直接使用的 ​​new Thread​​​ ,而在这里使用的是 ​​new InternalThread​​​,有什么区别呢?仔细看看 ​​InternalThread​​​ 也就是重新封装了一下 ​​Thread​​​ ,好像什么都没做。其实作用这这里是看不出来的,看过 ​​Netty​​​ 源码的可能知道 ​​Netty​​​ 有个 ​​FastThreadLocal​​​ ​​FastThreadLocalThread​​​,​​Dubbo​​​ 在这里也是如此,借鉴了 ​​Netty​​​ 源码。要说 ​​FastThreadLocal​​​ 和 ​​ThreadLocal​​​ 什么区别,请移步我的这篇文章​​Java源码之ThreadLocal​​,在这里我就不多废话了。

线程池拒绝策略

Dubbo 也重写了线程池拒绝策略,主要操作就是在线程池拒绝时保存堆栈信息到文件。具体可以看我分析的这篇文章 ​​Dubbo 源码分析之自定义线程池拒绝策略​​,这里不再详细介绍。

下面开始看线程池

CachedThreadPool

public class CachedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { //线程池名称前缀 默认:Dubbo String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); //核心线程数 默认:0 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); // 最大线程数 默认:Integer.MAX_VALUE int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); //阻塞队列大小 默认:0 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); //线程存活时间 默认:60s int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

所谓 ​​CachedThreadPool​​,就是线程池会伸缩,当事件较多时创建新的线程,事件较少时,超出一定时间回收线程。

​​注意​​​:其实此 ​​CachedThreadPool​​​ 并不是正经的 ​​cache​​​,可能大家发现了该线程池的阻塞队列虽然默认是 ​​SynchronousQueue​​​,但是如果用户配置了 ​​queues​​​ 变量,且其值较大,使用的阻塞队列就是 ​​LinkedBlockingQueue​​​,此时一旦 ​​corethreads​​​ 再使用默认值​​0​​,就会导致处理事件阻塞。

FixedThreadPool

public class FixedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

​​FixedThreadPool​​​ 表示核心线程和最大线程大小都为固定值,在 ​​Dubbo​​ 中的确也是这么处理的,但是这里还有要注意的地方。

​​注意​​​:如果使用 ​​fixed​​​ 线程池,默认的线程池数量为​​200​​​,默认阻塞队列大小为​​0​​​,默认使用的阻塞队列为​​SynchronousQueue​​​。如果你的业务事件并发较高或者处理时间较长,请适当调整阻塞队列大小,即 ​​queues​​​ 变量,否则会导致大量请求被丢弃。该线程池也是 ​​Dubbo​​ 默认使用的线程池,估计出事的挺多。

LimitedThreadPool

public class LimitedThreadPool implements ThreadPool { public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue() : (queues < 0 ? new LinkedBlockingQueue() : new LinkedBlockingQueue(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); }}

这个线程池就比较有意思的,该线程池中的线程数可以一直增长到上限,永不回收,其实永不回收是个伪概念,查看代码可以发现,该线程设置线程的回收时间限制为 ​​Long.MAX_VALUE​​​,可以理解为永久不回收。这个如果说要注意的那就只有一点,因为线程池是不回收的,所以 ​​threads​​​ 变量即线程最大限制的值不能太大,使用默认 ​​200​​​ 即可,避免 ​​OOM​​.

EagerThreadPool

注:此线程池在dubbo-2.6.5及其以下没有实现

首先介绍一下该线程池:线程池中的所有核心线程都在忙碌时,此时如果再添加新的任务不会放入阻塞队列,而且创建新的线程,直到达到最大线程限制,此时如果还有任务,才会放入阻塞队列。

public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // init queue and executor TaskQueue taskQueue = new TaskQueue(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; }}

大致规规矩矩,和上面一样,里面好像有两个我们没有见过的类 ​​TaskQueue​​​ 和 ​​EagerThreadPoolExecutor​​ 。

先看一下自定义阻塞队列 ​​TaskQueue​​

public class TaskQueue extends LinkedBlockingQueue { private static final long serialVersionUID = -2635853580887179627L; private EagerThreadPoolExecutor executor; public TaskQueue(int capacity) { super(capacity); } public void setExecutor(EagerThreadPoolExecutor exec) { executor = exec; } // 重写 offer 操作 @Override public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); // 小于核心线程数 直接调用父类offer if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { return super.offer(runnable); } // 返回失败使线程池新建线程来执行任务 伪装阻塞队列已满 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; } // currentPoolThreadSize >= max return super.offer(runnable); } // 重试 offer 操作 public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (executor.isShutdown()) { throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); }}

最主要的逻辑就是在重写的 ​​offer​​ 方法里面,通过重写该方法可以在我们向线程池提交任务时,伪装阻塞队列已满,来使线程池新建线程执行任务。

贴上一段 ​​ThreadPoolExecutor​​​ 类的 ​​execute​​ 代码

public void execute(Runnable command) { /*如果提交的任务为null 抛出空指针异常*/ if (command == null) throw new NullPointerException(); int c = ctl.get(); /*如果当前的任务数小于等于设置的核心线程大小,那么调用addWorker直接执行该任务*/ if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } /*如果当前的任务数大于设置的核心线程大小,而且当前的线程池状态时运行状态,那么向阻塞队列中添加任务*/ if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } /*如果向队列中添加失败,那么就新开启一个线程来执行该任务*/ //tip:最主要的就是这里 当向阻塞队列插入失败时,会直接调用 addWorker 方法,创建新的线程执行任务 else if (!addWorker(command, false)) reject(command); }

想具体了解细节的可以查看我的这篇博客 ​​线程池(ThreadPoolExecutor)源码分析之如何保证核心线程不被销毁的​​​。 通过 ​​​ThreadPoolExecutor​​​ 源码可以发现,当向阻塞队列插入失败时,会直接调用 ​​addWorker​​​ 方法,创建新的线程执行任务。现在就和重写的 ​​TaskQueue​​​ 的 ​​offer​​ 方法对上了。

那就继续看 ​​EagerThreadPoolExecutor​​ 类。

// 继承ThreadPoolExecutorpublic class EagerThreadPoolExecutor extends ThreadPoolExecutor { // 任务数量 private final AtomicInteger submittedTaskCount = new AtomicInteger(0); public EagerThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } // 返回正在执行的任务数量 public int getSubmittedTaskCount() { return submittedTaskCount.get(); } // ThreadPoolExecutor 模版方法 任务执行完成之后会调用 @Override protected void afterExecute(Runnable r, Throwable t) { submittedTaskCount.decrementAndGet(); } // 重写 execute 方法 @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { // 提交任务拒绝时 直接放入阻塞队列 final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } }}

重写的线程池也很简单,提供一个当前任务数量的接口,供 ​​TaskQueue​​​ 使用。​​execute​​​ 方法也很好理解。可能有的小伙伴要问了,为什么要在 ​​execute​​​ 方法中捕获 ​​RejectedExecutionException​​​ 异常呢。这个还要回到 ​​ThreadPoolExecutor​​ 源码中

//省略无关代码 else if (!addWorker(command, false)) reject(command);//省略无关代码

这句代码,在 ​​addWorker​​​ 失败时,会抛出 ​​RejectedExecutionException​​​ 异常。在未重写的线程池中抛出该异常的条件是:阻塞队列已满,线程数已达到最大限制。重写后的 ​​TaskQueue​​​ 阻塞队列中 ​​offer​​​ 还会在线程池中线程数量小于最大限制时返回 ​​false​​​.即使返回 ​​false​​​ 新的阻塞队列也不代表已经满了,所以需要 ​​retryOffer​​

//省略无关代码 if (currentPoolThreadSize < executor.getMaximumPoolSize()) { return false; }//省略无关代码

线程池分析结束了,看了下提交记录 ​​EagerThreadPool​​​ 好像还是当初我们公司的前师兄写的。目前好像是 ​​Dubbo​​​ 的 ​​PPMC​​。厉害的很啊,大家一起努力学习啊。coding

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:filter怎样全局使用(filter如何使用)
下一篇:关于 mysql 的共享锁 排它锁以及锁的实现方式 行锁 间隙锁 Next-Key Lock
相关文章

 发表评论

暂时没有评论,来抢沙发吧~