Netty分布式NioEventLoop任务队列执行源码分析

网友投稿 558 2022-10-17

Netty分布式NioEventLoop任务队列执行源码分析

Netty分布式NioEventLoop任务队列执行源码分析

目录执行任务队列跟进runAllTasks方法:我们跟进fetchFromScheduledTaskQueue()方法回到runAllTasks(long timeoutNanos)方法中回到runAllTasks(long timeoutNanos)方法章节小结

前文传送门:NioEventLoop处理IO事件

执行任务队列

继续回到NioEventLoop的run()方法:

protected void run() {

for (;;) {

try {

switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {

case SelectStrategy.CONTINUE:

continue;

case SelectStrategy.SELECT:

//轮询io事件(1)

select(wakenUp.getAndSet(false));

if (wakenUp.get()) {

selector.wakeup();

}

default:

}

cancelledKeys = 0;

needsToSelectAgain = false;

//默认是50

final int ioRatio = this.ioRatio;

if (ioRatio == 100) {

try {

processSelectedKeys();

} finally {

runAllTasks();

}

} else {

//记录下开始时间

final long ioStartTime = System.nanoTime();

try {

//处理轮询到的key(2)

processSelectedKeys();

} finally {

//计算耗时

final long ioTime = System.nanoTime() - ioStartTime;

//执行task(3)

runAllTasks(ioTime * (100 - ioRatio) / ioRatio);

}

}

} catch (Throwable t) {

handleLoopException(t);

}

//代码省略

}

}

我们看到处理完轮询到的key之后, 首先记录下耗时, 然后通过runAllTasks(ioTime * (100 - ioRatio) / ioRatio)执行taskQueue中的任务

我们知道ioRatio默认是50, 所以执行完ioTime * (100 - ioRatio) / ioRatio后, 方法传入的值为ioTime, 也就是processSelectedKeys()的执行时间:

跟进runAllTasks方法:

protected boolean runAllTasks(long timeoutNanos) {

//定时任务队列中聚合任务

fetchFromScheduledTaskQueue();

//从普通taskQ里面拿一个任务

Runnable task = pollTask();

//task为空, 则直接返回

if (task == null) {

//跑完所有的任务执行收尾的操作

afterRunningAllTasks();

return false;

}

//如果队列不为空

//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

long runTasks = 0;

long lastExecutionTime;

//执行每一个任务

for (;;) {

safeExecute(task);

//标记当前跑完的任务

runTasks ++;

//当跑完64个任务的时候, 会计算一下当前时间

if ((runTasks & 0x3F) == 0) {

//定时任务初始化到当前的时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

//如果超过截止时间则不执行(nanoTime()是耗时的)

if (lastExecutionTime >= deadline) {

break;

}

}

//如果没有超过这个时间, 则继续从普通任务队列拿任务

task = pollTask();

//直到没有任务执行

if (task == null) {

//记录下最后执行时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

break;

}

}

//收尾工作

afterRunningAllTasks();

this.lastExecutionTime = lastExecutionTime;

return true;

}

首先会执行fetchFromScheduledTaskQueue()这个方法, 这个方法的意思是从定时任务队列中聚合任务, 也就是将定时任务中找到可以执行的任务添加到taskQueue中

我们跟进fetchFromScheduledTaskQueue()方法

private boolean fetchFromScheduledTaskQueue() {

long nanoTime = AbstractScheduledEventExecutor.nanoTime();

//从定时任务队列中抓取第一个定时任务

//寻找截止时间为nanoTime的任务

Runnable scheduledTask = pollScheduledTask(nanoTime);

//如果该定时任务队列不为空, 则塞到普通任务队列里面

while (scheduledTask != null) {

//如果添加到普通任务队列过程中失败

if (!taskQueue.offer(scheduledTask)) {

//则重新添加到定时任务队列中

scheduledTaskQueue().add((ScheduledFutureTask>) scheduledTask);

return false;

}

//继续从定时任务队列中拉取任务

//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中

scheduledTask = pollScheduledTask(nanoTime);

}

return true;

}

long nanoTime = AbstractScheduledEventExecutor.nanoTime()

代表从定时任务初始化到现在过去了多长时间

Runnable scheduledTask= pollScheduledTask(nanoTime)

代表从定时任务队列中拿到小于nanoTime时间的任务, 因为小于初始化到现在的时间, 说明该任务需要执行了

跟到其父类AbstractScheduledEventExecutor的pollScheduledTask(nanoTime)方法中:

protected final Runnable pollScheduledTask(long nanoTime) {

assert inEventLoop();

//拿到定时任务队列

Queue> scheduledTaskQueue = this.scheduledTaskQueue;

//peek()方法拿到第一个任务

ScheduledFutureTask> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();

if (scheduledTask == null) {

return null;

}

if (scheduledTask.deadlineNanos() <= nanoTime) {

//从队列中删除

scheduledTaskQueue.remove();

//返回该任务

return scheduledTask;

}

return null;

}

我们看到首先获得当前类绑定的定时任务队列的成员变量

如果不为空, 则通过scheduledTaskQueue.peek()弹出第一个任务

如果当前任务小于传来的时间, 说明该任务需要执行, 则从定时任务队列中删除

我们继续回到fetchFromScheduledTaskQueue()方法中:

private boolean fetchFromScheduledTaskQueue() {

long nanoTime = AbstractScheduledEventExecutor.nanoTime();

//从定时任务队列中抓取第一个定时任务

//寻找截止时间为nanoTime的任务

Runnable scheduledTask = pollScheduledTask(nanoTime);

//如果该定时任务队列不为空, 则塞到普通任务队列里面

while (scheduledTask != null) {

//如果添加到普通任务队列过程中失败

if (!taskQueue.offer(scheduledTask)) {

//则重新添加到定时任务队列中

scheduledTaskQueue().add((ScheduledFutureTask>) scheduledTask);

return false;

}

//继续从定时任务队列中拉取任务

//方法执行完成之后, 所有符合运行条件的定时任务队列, 都添加到了普通任务队列中

scheduledTask = pollScheduledTask(nanoTime);

}

return true;

}

弹出需要执行的定时任务之后, 我们通过taskQueue.offer(scheduledTask)添加到taskQueue中, 如果添加失败, 则通过

scheduledTaskQueue().add((ScheduledFutureTask>) scheduledTask)

重新添加到定时任务队列中

如果添加成功, 则通过pollScheduledTask(nanoTime)方法继续添加, 直到没有需要执行的任务

这样就将定时任务队列需要执行的任务添加到了taskQueue中

回到runAllTasks(long timeoutNanos)方法中

protected boolean runAllTasks(long timeoutNanos) {

//定时任务队列中聚合任务

fetchFromScheduledTaskQueue();

//从普通taskQ里面拿一个任务

Runnable task = pollTask();

//task为空, 则直接返回

if (task == null) {

//跑完所有的任务执行收尾的操作

afterRunningAllTasks();

return false;

}

//如果队列不为空

//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

long runTasks = 0;

long lastExecutionTime;

//执行每一个任务

for (;;) {

safeExecute(task);

//标记当前跑完的任务

runTasks ++;

//当跑完64个任务的时候, 会计算一下当前时间

if ((runTasks & 0x3F) == 0) {

//定时任务初始化到当前的时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

//如果超过截止时间则不执行(nanoTime()是耗时的)

if (lastExecutionTime >= deadline) {

break;

}

}

//如果没有超过这个时间, 则继续从普通任务队列拿任务

task = pollTask();

//直到没有任务执行

if (task == null) {

//记录下最后执行时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

break;

}

}

//收尾工作

afterRunningAllTasks();

this.lastExecutionTime = lastExecutionTime;

return true;

}

首先通过 Runnable task = pollTask() 从taskQueue中拿一个任务

任务不为空, 则通过

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos

计算一个截止时间, 任务的执行时间不能超过这个时间

然后在for循环中通过safeExecute(task)执行task

我们跟到safeExecute(task)中:

protected static void safeExecute(Runnable task) {

try {

//直接调用run()方法执行

task.run();

} catch (Throwable t) {

//发生异常不终止

logger.warn("A task raised an exception. Task: {}", task, t);

}

}

这里直接调用task的run()方法进行执行, 其中发生异常, 只打印一条日志, 代表发生异常不终止, 继续往下执行

回到runAllTasks(long timeoutNanos)方法

protected boolean runAllTasks(long timeoutNanos) {

//定时任务队列中聚合任务

fetchFromScheduledTaskQueue();

//从普通taskQ里面拿一个任务

Runnable task = pollTask();

//task为空, 则直接返回

if (task == null) {

//跑完所有的任务执行收尾的操作

afterRunningAllTasks();

return false;

}

//如果队列不为空

//首先算一个截止时间(+50毫秒, 因为执行任务, 不要超过这个时间)

final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;

long runTasks = 0;

long lastExecutionTime;

//执行每一个任务

for (;;) {

safeExecute(task);

//标记当前跑完的任务

runTasks ++;

//当跑完64个任务的时候, 会计算一下当前时间

if ((runTasks & 0x3F) == 0) {

//定时任务初始化到当前的时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

//如果超过截止时间则不执行(nanoTime()是耗时的)

if (lastExecutionTime >= deadline) {

break;

}

}

//如果没有超过这个时间, 则继续从普通任务队列拿任务

task = pollTask();

//直到没有任务执行

if (task == null) {

//记录下最后执行时间

lastExecutionTime = ScheduledFutureTask.nanoTime();

break;

}

}

//收尾工作

afterRunningAllTasks();

this.lastExecutionTime = lastExecutionTime;

return true;

}

每次执行完task, runTasks自增

这里 if ((runTasks & 0x3F) == 0) 代表是否执行了64个任务, 如果执行了64个任务, 则会通过 lastExecutionTime = ScheduledFutureTask.nanoTime() 记录定时任务初始化到现在的时间, 如果这个时间超过了截止时间, 则退出循环

如果没有超过截止时间, 则通过 task = pollTask() 继续弹出任务执行

这里执行64个任务统计一次时间, 而不是每次执行任务都统计, 主要原因是因为获取系统时间是个比较耗时的操作, 这里是netty的一种优化方式

如果没有task需要执行, 则通过afterRunningAllTasks()做收尾工作, 最后记录下最后的执行时间

以上就是有关执行任务队列的相关逻辑

章节小结

本章学习了有关NioEventLoopGroup的创建, NioEventLoop的创建和启动, 以及多路复用器的轮询处理和task执行的相关逻辑, 通过本章学习, 我们应该掌握如下内容:

1.  NioEventLoopGroup如何选择分配NioEventLoop

2.  NioEventLoop如何开启

3.  NioEventLoop如何进行select操作

4.  NioEventLoop如何执行task

以上就是Netty分布式NioEventLoop任务队列执行源码分析的详细内容,更多关于Netty分布式NioEventLoop执行任务队列的资料请关注我们其它相关文章!

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spring-kafka入门使用Demo(超详细)---------无授权认证方式
下一篇:swoft框架通用缓存组件,基于swoft框架的aop实现
相关文章

 发表评论

暂时没有评论,来抢沙发吧~