qrpc, 轻量级的通用长链接框架

网友投稿 1051 2022-10-24

qrpc, 轻量级的通用长链接框架

qrpc, 轻量级的通用长链接框架

qrpc, 轻量级的通用长链接框架

qrpc 提供完整的服务端及客户端功能,并支持以下4种特性使得rpc变得极为容易:

阻塞 或 非阻塞流式 或 非流式主动推送双向调用

默认是阻塞模式,也就是同一个长链接的请求是串行处理,类似http/1.1,但是通过微小的改动就可以切换到其他模式。

此外,qrpc还提供了桥接网络的特性,该特性使得多协议支持不费吹灰之力,同样的一套代码可以同时跑在tcp、websocket及任意已有的协议之下,详情参考ws/README.md;以及代码生成功能,使得服务的注册和调用极大地便利化,详情参考codegen demo。

这里是ppt版。

协议设计

qrpc提供请求->响应以及主动推送两大类的交互能力。

请求->响应又分为「阻塞 或 非阻塞」以及「流式 或 非流式」。

qrpc的请求和响应有相同的结构:帧,即代码中的Frame。

每个帧包括8字节的唯一标识,1字节的flag,3字节的命令,以及不超过可配置上限长度的负荷。

客户端会为每个请求帧自动生成8字节的唯一标识,服务端对应的响应帧会有相同的唯一标识。

通过这种方式,一个长链接可以同时发起多个请求,并且精确地知道每个请求对应的响应结果。

此外,请求和响应都可以由多个帧组成,类似http中的chunked传输模式,这就是前面提到的流式或非流式。

而所有关于是否阻塞、是否流式、是否主动推送的元信息,都包含在头部1字节的flag之中!

具体字段参考frame.md。

话不多说,干货开始!

阻塞模式

server.go:

package mainimport "github.com/zhiqiangxu/qrpc"const ( HelloCmd qrpc.Cmd = iota HelloRespCmd)func main() { // handler的作用是路由,根据请求帧的命令,分发到不同的处理子函数 handler := qrpc.NewServeMux() // 注册HelloCmd命令对应的子函数 handler.HandleFunc(HelloCmd, func(writer/*用于回写响应*/ qrpc.FrameWriter, request/*当前请求的相关信息*/ *qrpc.RequestFrame) { // 响应帧和请求帧有相同的唯一标识,并且这里把响应帧的命令设置为HelloRespCmd,会更方便调试 writer.StartWrite(request.RequestID, HelloRespCmd, 0) // 负荷部分为:hello world + 请求帧的原始负荷 writer.WriteBytes(append([]byte("hello world "), request.Payload...)) // 前面的StartWrite和WriteBytes其实是构建响应帧的过程 // 构建完毕后通过EndWrite触发实际的回写 writer.EndWrite() }) // ServerBinding用于配制想监听的端口以及对应的处理函数,如果想监听多个端口,提供多个即可 bindings := []qrpc.ServerBinding{ qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} // 构建server server := qrpc.NewServer(bindings) // 开始监听 server.ListenAndServe()}

client.go:

package mainimport ( "fmt" "github.com/zhiqiangxu/qrpc")const ( HelloCmd qrpc.Cmd = iota)func main() { // 采用默认配置 conf := qrpc.ConnectionConfig{} // 建立一个qrpc的长链接 conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil) // 发起一个命令为HelloCmd的请求帧,flag空,负荷为xu _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu")) // 获取响应 frame, _ := resp.GetFrame() // 打印响应负荷 fmt.Println("resp is", string(frame.Payload))}

上面的例子中,由于flag为空,所以服务端会采用默认的串行处理模式。

非阻塞模式

要使用该模式,只需要修改client.go的一行代码:

- _, resp, _ := conn.Request(HelloCmd, 0/*no flags*/, []byte("xu"))+ _, resp, _ := conn.Request(HelloCmd, qrpc.NBFlag, []byte("xu"))

这样服务端便会并行处理这个长链接发来的请求!

流式

类似http中的chunked传输模式,不论请求还是响应,都可以拆成多个帧。

下面按流式请求和流式响应分别介绍。

流式请求:

streamclient.go:

package mainimport ( "fmt" "github.com/zhiqiangxu/qrpc")const ( HelloCmd qrpc.Cmd = iota)func main() { // 采用默认配置 conf := qrpc.ConnectionConfig{} // 建立一个qrpc的长链接 conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil) // 采用流式发送HelloCmd请求,第一个请求帧的负荷是first frame writer, resp, _ := conn.StreamRequest(HelloCmd, 0, []byte("first frame")) // 构建第二个请求帧,负荷是last frame writer.StartWrite(HelloCmd) writer.WriteBytes([]byte("last frame")) // 发送请求,并标记流式结束 writer.EndWrite(true) // will attach StreamEndFlag // 获取响应 frame, _ := resp.GetFrame() // 打印响应负荷 fmt.Println("resp is", string(frame.Payload))}

streamserver.go:

package mainimport ( "github.com/zhiqiangxu/qrpc")const ( HelloCmd qrpc.Cmd = iota HelloRespCmd)func main() { handler := qrpc.NewServeMux() handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { // 首帧的处理类似非流式,只是EndWrite最后才会调用 writer.StartWrite(request.RequestID, HelloRespCmd, 0) writer.WriteBytes(append([]byte("first frame "), request.Payload...)) // 循环取出流式请求中的剩余帧 for { continueFrames := <-request.FrameCh() // continueFrames为nil表示该请求的所有帧获取完毕 if continueFrames == nil { break } // 将后续帧的负荷追加到响应中去 writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...)) } // 响应帧构建完毕,回写给客户端 writer.EndWrite() }) bindings := []qrpc.ServerBinding{ qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} server := qrpc.NewServer(bindings) err := server.ListenAndServe() if err != nil { panic(err) }}

流式响应:

package mainimport ( "github.com/zhiqiangxu/qrpc" "fmt")const ( HelloCmd qrpc.Cmd = iota HelloRespCmd)func main() { handler := qrpc.NewServeMux() handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { // 构建流式响应的第一帧 writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag) writer.WriteBytes(append([]byte("first frame "), request.Payload...)) // 第一帧构建完毕,发送 writer.EndWrite() for { // 获取流式请求的后续帧 continueFrames := <-request.FrameCh() if continueFrames == nil { break } fmt.Printf("%s\n", continueFrames.Payload) // 构建流式响应的后续帧 if continueFrames.Flags.IsDone() { // 最后一帧,flag标记为qrpc.StreamEndFlag writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamEndFlag) } else { // 不是最后一帧,标记为qrpc.StreamFlag writer.StartWrite(request.RequestID, HelloRespCmd, qrpc.StreamFlag) } writer.WriteBytes(append([]byte(" continue frame "), continueFrames.Payload...)) // 后续帧构建完毕,发送 writer.EndWrite() } }) bindings := []qrpc.ServerBinding{ qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} server := qrpc.NewServer(bindings) err := server.ListenAndServe() if err != nil { panic(err) }}

关键是qrpc.StreamFlag!

推送模式

package mainimport ( "github.com/zhiqiangxu/qrpc" "github.com/zhiqiangxu/util" "sync")const ( HelloCmd qrpc.Cmd = iota HelloRespCmd)func main() { handler := qrpc.NewServeMux() handler.HandleFunc(HelloCmd, func(writer qrpc.FrameWriter, request *qrpc.RequestFrame) { var ( wg sync.WaitGroup ) qserver := request.ConnectionInfo().Server() pushID := qserver.GetPushID() // 遍历所有长链接 qserver.WalkConn(0, func(writer qrpc.FrameWriter, ci *qrpc.ConnectionInfo) bool { util.GoFunc(&wg, func() { // PushFlag表示主动推送 writer.StartWrite(pushID, HelloCmd, qrpc.PushFlag) writer.WriteBytes([]byte("pushed msg")) writer.EndWrite() }) return true }) wg.Wait() writer.StartWrite(request.RequestID, HelloRespCmd, 0) writer.WriteBytes(append([]byte("push done"), request.Payload...)) writer.EndWrite() }) bindings := []qrpc.ServerBinding{ qrpc.ServerBinding{Addr: "0.0.0.0:8080", Handler: handler}} server := qrpc.NewServer(bindings) err := server.ListenAndServe() if err != nil { panic(err) }}

上述代码中,HelloCmd的处理子函数将给每个长链接推送一条消息!

客户端处理推送消息的方式如下:

- conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, nil)+ conn, _ := qrpc.NewConnection("0.0.0.0:8080", conf, func(conn *qrpc.Connection, pushedFrame *qrpc.Frame) {+ fmt.Println(pushedFrame)+ })

双向调用

前面的demo都是client调用server,其实client也可以注册回调让server调,参考测试用例中的TestClientHandler。

Performance

MBP单机压测数据

性能大概是http的 4 倍!

单独施压机,16C32G压测数据:

qps达到了203k,且此时cpu使用率为75%。

基于qrpc的项目

qchat (趣头条IM中台,未开源)qpush (趣头条Push中台,未开源)qfe (趣头条长链网关,未开源)kvrpc (一个支持事务的kv数据库)

社区

目前我们暂时只针对中国的用户,所以采用了微信群的交流方式,下面是二维码,有兴趣的同学可以扫码加入:

PS:扫码请注明来意,比如:学习qrpc或者go爱好者

Stargazers

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:#yyds干货盘点# 面试必刷TOP101: 删除链表的倒数第n个节点
下一篇:Number 类及各子类所占字节数源码分析
相关文章

 发表评论

暂时没有评论,来抢沙发吧~