除了标准库 sync 包外,这个官方包(golang.org/x下的)你应该了解(标准库的作用)

网友投稿 687 2022-08-19

除了标准库 sync 包外,这个官方包(golang.org/x下的)你应该了解(标准库的作用)

除了标准库 sync 包外,这个官方包(golang.org/x下的)你应该了解(标准库的作用)

x-files

这里的 x-files 指的是 golang.org/x/ 下的官方辅助包,本文我们介绍下 golang.org/x/sync 这个包。

为什么需要 x-files?

go 语言里使用 go 关键字就可以轻松搞定并发的问题,它讲究的是“不要通过共享内存来通信,而应该通过通信来共享内存”的原则,在 Go 里 channel 是个线程安全的多线程通讯介质。Go 本身的这些原语基本上可以解决常见的各种需求了。

但是有些时候在某些情况下,面对并发问题或在错误处理上需要引入更多的协调手段,比如,goroutines 会去访问一个非线程安全的资源(如 map,slice 等)。Go 的标准库给我们提供了 WaitGroup, Once, 和 Mutex, 如果再野一点还可以用 sync/atomic(原子操作包,偏底层,特殊场景的时候用到),这些工具结合上 chanel 确实可以解决并发场景下数据竞争的问题,但是这样做多少回让我们的实现变的比较繁琐(死板),难道就不能对这样的需求做下更优雅的抽象吗?

非常不幸,Go 的 sync 包不提供更高层次的模式和封装来解决我们的痛点。但有个非常好的消息是,golang.org/x/sync 把标准库没去做的事情给实现了。本文我就对这个辅助库进行介绍,会涉及 errgroup、semaphore、singleflight、syncmap 4 个工具的举例阐述,介绍他们如何解决常见的并发场景的需求。

快速了解下 sync 包

errgroup 的示例

import ( "fmt" "io/ioutil" "net/http" "golang.org/x/sync/errgroup" ) func main() { var g errgroup.Group var urls = []string{ "http://localhost:1234/sleep?sec=1", "http://localhost:1234/sleep?sec=2", "http://localhost:1234/sleep?sec=3", "https://www23.s232323o.com/", "http://localhost:1234/sleep?sec=4", "http://localhost:1234/sleep?sec=5", "http://localhost:1234/sleep?sec=6",

} for _, url := range urls { // 用g.Go 开启 goroutine 并行抓取URL url := url

g.Go(func() error { // 抓取url resp, err := http.Get(url) if err == nil {

fmt.Println(url)

b, e := ioutil.ReadAll(resp.Body)

fmt.Println(string(b), e) defer resp.Body.Close()

} return err

})

} // 等待全部的URL抓取完毕 if err := g.Wait(); err == nil {

fmt.Println("全部抓取成功.")

} else {

fmt.Println("抓取失败,原因是:", err)

}

}

以上代码示例会并行抓取 3 个 url,当有错误发生会报出来,结果如下:

http://localhost:1234/sleep?sec=1

I sleep 1 second.

http://localhost:1234/sleep?sec=2

I sleep 2 second.

http://localhost:1234/sleep?sec=3

I sleep 3 second.

http://localhost:1234/sleep?sec=4

I sleep 4 second.

http://localhost:1234/sleep?sec=5

I sleep 5 second.

http://localhost:1234/sleep?sec=6

I sleep 6 second.

抓取失败,原因是:Get https://www23.s232323o.com/: dial tcp: lookup www23.s232323o.com: no such host

模拟慢速的 api 的代码如下 sleep.go:

package main import ( "strconv" "time" "github.com/gin-gonic/gin" ) func main() {

router := gin.Default()

router.GET("/sleep", Sleep)

router.Run(":1234")

} func Sleep(c *gin.Context) {

sec := c.DefaultQuery("sec", "1")

secInt, _ := strconv.Atoi(sec)

time.Sleep(time.Duration(secInt) * time.Second)

c.String(200, "I sleep %s second.", sec)

}

一个并发运行的示例:

package main import ( "context" "fmt" "io/ioutil" "log" "net/http" "os" "golang.org/x/sync/errgroup" ) var (

Web = doSearch("web")

Image = doSearch("image")

Video = doSearch("video")

) var apiMap = map[string]string{"web": "https://so.com/s?q=%s", "image": "http://image.so.com/i?q=%s", "video": "https://video.360kan.com/v?q=%s"} func doSearch(kind string) Search { var api = apiMap[kind] return func(query string) (Result, error) {

api := fmt.Sprintf(api, query)

req, err := http.NewRequest("GET", api, nil) if err != nil {

log.Fatalf("%v", err)

}

client := http.DefaultClient

resp, err := client.Do(req) if err != nil {

log.Fatalf("%v", err)

} var body string if err == nil {

b, e := ioutil.ReadAll(resp.Body) if e == nil {

body = string(b)

} defer resp.Body.Close()

} return Result{fmt.Sprintf("%s result for %q\n:%s\n", kind, query, body)}, nil }

} type Result struct {

str string } type Search func(query string) (Result, error) func main() {

So := func(ctx context.Context, query string) ([]Result, error) {

g, ctx := errgroup.WithContext(ctx)

searches := []Search{Web, Image, Video}

results := make([]Result, len(searches)) for i, search := range searches {

i, search := i, search // 这里是关于闭包的一个坑,详细看这里 https://golang.org/doc/faq#closures_and_goroutines g.Go(func() error {

result, err := search(query) if err == nil {

results[i] = result

} return err

})

} if err := g.Wait(); err != nil { return nil, err

} return results, nil }

results, err := So(context.Background(), "golang") if err != nil {

fmt.Fprintln(os.Stderr, err) return } for _, result := range results {

fmt.Println(result)

}

}

这里用 360 搜索的 3 个频道为例,演示了并行抓取 3 类搜索接口数据的例子。

Action/Executor 模式

Action/Executor 模式出自 Gang of Four 的设计模式里的一种,也被叫做命令模式,是非常强大的一个设计模式。它从实际运行的方式里(Excutor)里抽象出了行为(Action),以下是这 2 个组件的基本接口:

// 一个Action接口,带一个Execute方法 type Action interface { // Execute 负责执行任务,这个方法会用context做参数,以便可以提供取消执行的功能 Execute(ctx context.Context) error

} // 一个Executor接口, 有个Execute方法可以执行多个Action。它根据实现的类型来定义action的并发、打开/关闭的失败行为. type Executor interface { // Execute 执行参数里提供的所有的action,通过调用他们对应的Execute方法。这个方式也会传递一个context用于终止操作。 Execute(ctx context.Context, actions []Action) error

} // ActionFunc 本类型定义如同Action接口的Execute方法,这就意味着可以运行使用一个独立的函数来充当Action type ActionFunc func(context.Context) error // Execute 实现了Action接口,把调用委托给了调用对象实现的函数 func (fn ActionFunc) Execute(ctx context.Context) error { return fn(ctx) }

Action 接口类型执行具体性的任务,Executro 负责把一组 Action 放在一起运行。创建一个 Executor 序列只是表面上的好处。但这会使并发的实现变的更有用,但是也需要更多的技巧。也就是说,Executor 需要很好的处理好错误且把 Actions 的生命周期同步好,这是至关重要的。

用 errgroup 来解决错误传播和取消

在我一开始写 Go 程序的时候,我非常迷信和滥用了 Go 的并发特性。我当时一时兴奋,却没有考虑到如果有多个 goroutine 有错误产出的时候会发生什么事情。

捕获到第一个错误后取消之后的 goroutine 执行,这是较为常见的处理模式。一个并行的 Executor 会被叠加到一起,使用一组 WaitGroups 和 channel 来完成协调。另一个优雅的实现方式就是使用 errgroup 包来解决,如下代码:

// Parallel 是一个Executor的并行实现 type Parallel struct{} // Execute方法并行的执行多个Action。如果第一个错误产生或者ctx被取消都会关闭执行 func (p Parallel) Execute(ctx context.Context, actions []Action) error {

grp, ctx := errgroup.WithContext(ctx) for _, a := range actions {

grp.Go(p.execFn(ctx, a))

} return grp.Wait()

} // execFn 将Context和Action绑定到errgroup.Group正确的函数签名上 func (p Parallel) execFn(ctx context.Context, a Action) func() error { return func() error { return a.Execute(ctx) }

}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Golang源码系列一:Map实现原理分析(golang map原理)
下一篇:Go 如何防止缓存穿透(go的过去式)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~