Swoole/Process API 实现的消息处理框架

网友投稿 568 2022-10-25

Swoole/Process API 实现的消息处理框架

Swoole/Process API 实现的消息处理框架

Jober 工作界面:

产生背景:

项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,显然会有更好的用户体验。 当然实际情况远不止这一点,我们可以通过Jober的配合完成 “订单超时关闭、自动评论、QQ邮箱定时发送功能等等”。

Jober 是什么

通过Swoole 官方提供Swoole/Process API才得以完成,Jober 相当于每个子进程。如果你开启8个Jober,那么就是8个进程同时消费。

核心特点

命令行:快速实现消息中间件消费、支持守护进程、常驻内存;自动加载:基于 PSR-4 ,完全使用 Composer 构建;模块化:支持 Composer ,可以很方便的使用第三方库;客户端:支持Beanstalk, 理论上来说还支持RabbitMQ并且可以多进程爬虫等等;Jober 管理器特性: Jober 管理器可单独分配,每个队列分配不同Worker数量完成任务;具体:配置文件Queue每个Worker进程均有一个定时器,可通过配置文件Crontab(毫秒级) 属性完成;每个Worker进程完程5000个任务,平滑退出重新添加进程负责这个队列;[ 防止内存泄漏 ]Worker进程异常退出,Master进程自动新建W接管当前队列;Jober 管理器接收到用户STOP信号,逐个平滑退出;Jober 服务已运行状态中,添加新队列并运行;举个栗子:jober new mailerbox:2jober new mailerbox:2,给队列添加进程提高运行效率;

环境要求

PHP >= 7.2 语言版本号Swoole >= 4.0.0 通信扩展Seaslog >= 2.0.2 日志扩展

快速开始

推荐使用 composer 安装。

composer create-project jober/jober

启动Jober:

$> cd Jober/app $> php jober start

如果你足够幸运的话,那么将会看到如下界面:

_ _ _ | | ___ | |__ ___ _ _ | || | / _ \ | '_ \ / -_) | '_| \__/ \___/ |_.__/ \___| |_| Jober: 1.0.0, php: 7.2.18, swoole: 4.3.4 2019-06-03 11:31:33 Jober[INFO]#> Jober has been started. (master PID: 20103) 2019-06-03 11:31:33 Jober[INFO]#> Worker(20104) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 2019-06-03 11:31:33 Jober[INFO]#> Worker(20105) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 2019-06-03 11:31:33 Jober[INFO]#> Worker(20106) started ¦ tube(mailerbox) watched ¦ crontab(2/s). 2019-06-03 11:31:33 Jober[INFO]#> Worker(20107) started ¦ tube(mailerbox) watched ¦ crontab(2/s).

Jober Command:

Usage: php jober [options] Options: start/Start debug mode. new /Add multiple processes to the queue. start -d/Daemon mode runs services. stop/Smooth stop services. restart/Smooth stop and started service

Jober 队列配置参数:

tubejobClassworker_numcrontab
MailerBoxapp\\Jobs\\MailboxJob42
Orderapp\\Jobs\\OrderJob83
队列名称自定义Job类 数量 扫描频率

Jober DEBUG 参数说明:

workerstatustubemax_taskscrontab
pid(15438) exited/reboot tube(mailerbox)max_tasks(2)2
进程号重启/退出负责管道/队列(mailerbox)完成2个任务退出重启每2秒读取一次

解读上面语句含义:

worker(15438): 负责队列(mailerbox), max(2)完成两个任务后自动退出并重启,自动创建进程接管这个队列. worker(15438): 负责队列(mailerbox), 2秒查看一次队列,是否有数据需要处理.

邮箱异步发送案例:

watchTube('MailerBox') ->reserve(3); $data = json_decode($job->payload, true); if (!empty($data)) { if (MailBox::send($data)) { //todo:: 邮件发送逻辑// (new MailerService())->log($data, MailStats::SENT_SUCCESS); # 发送成功记录一条日志 Console::println(date('Y-m-d ')." 异步邮件已发送 -> 内容:".$data['body'] ,ColorText::FG_CYAN); // 删除队列中的已发送邮件 $job->delete(); Console::println(date('Y-m-d ')." 异步队列中删除 -> 内容:".$data['to'] ,ColorText::FG_RED); return true; }// (new MailerService())->log($data, MailStats::SENT_ERROR); # 发送失败记录日志 } } catch (\Exception $e) { /** * 1. 异常必须捕捉,报错会不断重启Jober * 2. 记录到日志文件 */// Log::error("异常原因:".$e->getMessage()); } return false; }}

License

Apache License Version 2.0, http://apache.org/licenses/

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Asity- JVM 上各种 Web 框架的抽象层
下一篇:mybatis多表查询的实现(xml方式)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~