golang协程池gopool怎么设计与实现-mile米乐体育

golang协程池gopool怎么设计与实现

这篇文章主要介绍“golang协程池gopool怎么设计与实现”,在日常操作中,相信很多人在golang协程池gopool怎么设计与实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”golang协程池gopool怎么设计与实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

goroutine

goroutine 是 golang 提供的一种轻量级线程,我们通常称之为「协程」,相比较线程,创建一个协程的成本是很低的。所以你会经常看到 golang 开发的应用出现上千个协程并发的场景。

goroutine 的优势:

  • 与线程相比,goroutines 成本很低。

它们的堆栈大小只有几 kb,堆栈可以根据应用程序的需要增长和缩小,context switch 也很快,而在线程的情况下,堆栈大小必须指定并固定。

  • goroutine 被多路复用到更少数量的 os 线程。

一个包含数千个 goroutine 的程序中可能只有一个线程。如果该线程中的任何 goroutine 阻塞等待用户输入,则创建另一个 os 线程并将剩余的 goroutine 移动到新的 os 线程。所有这些都由运行时处理,作为开发者无需耗费心力关心,这也使得我们有很干净的 api 来支持并发。

  • goroutines 使用 channel 进行通信。

channel 的设计有效防止了在使用 goroutine 访问共享内存时发生竞争条件(race conditions) 。channel 可以被认为是 goroutine 进行通信的管道。

协程池

在高并发场景下,我们可能会启动大量的协程来处理业务逻辑。协程池是一种利用池化技术,复用对象,减少内存分配的频率以及协程创建开销,从而提高协程执行效率的技术。

最近抽空了解了字节官方开源的 gopkg 库提供的 gopool 协程池实现,感觉还是很高质量的,代码也非常简洁清晰,而且 kitex 底层也在使用 gopool 来管理协程,这里我们梳理一下设计和实现。

gopool

了解官方 readme 就会发现gopool的用法其实非常简单,将曾经我们经常使用的go func(){...}替换为gopool.go(func(){...}) 即可。

此时 gopool 将会使用默认的配置来管理你启动的协程,你也可以选择针对业务场景配置池子大小,以及扩容上限。

old:

gofunc(){//doyourjob}()

new:

import("github.com/bytedance/gopkg/util/gopool")gopool.go(func(){///doyourjob})

核心实现

下面我们来看看gopool是怎样实现协程池管理的。

pool

pool 是一个定义了协程池能力的接口。

typepoolinterface{//池子的名称name()string//设置池子内goroutine的容量setcap(capint32)//执行f函数go(ffunc())//带ctx,执行f函数ctxgo(ctxcontext.context,ffunc())//设置发生panic时调用的函数setpanichandler(ffunc(context.context,interface{}))}

gopool 提供了这个接口的默认实现(即下面即将介绍的pool),当我们直接调用 gopool.ctxgo 时依赖的就是这个。

这样的设计模式在 kitex 中也经常出现,所有的依赖均设计为接口,便于随后扩展,底层提供一个默认的实现暴露出去,这样对调用方也很友好。

typepoolstruct{//池子名称namestring//池子的容量,即最大并发工作的goroutine的数量capint32//池子配置config*config//task链表taskhead*tasktasktail*tasktasklocksync.mutextaskcountint32//记录当前正在运行的worker的数量workercountint32//当worker出现panic时被调用panichandlerfunc(context.context,interface{})}//newpool创建一个新的协程池,初始化名称,容量,配置funcnewpool(namestring,capint32,config*config)pool{p:=&pool{name:name,cap:cap,config:config,}returnp}

调用 newpool 获取了以 pool 的形式返回的 pool 结构体。

task

typetaskstruct{ctxcontext.contextffunc()next*task}

task 是一个链表结构,可以把它理解为一个待执行的任务,它包含了当前节点需要执行的函数f, 以及指向下一个task的指针。

综合前一节 pool 的定义,我们可以看到,一个协程池 pool 对应了一组task

pool 维护了指向链表的头尾的两个指针:taskheadtasktail,以及链表的长度taskcount 和对应的锁 tasklock

worker

typeworkerstruct{pool*pool}

一个 worker 就是逻辑上的一个执行器,它唯一对应到一个协程池 pool。当一个worker被唤起,将会开启一个goroutine ,不断地从 pool 中的 task链表获取任务并执行。

func(w*worker)run(){gofunc(){for{//声明即将执行的taskvart*task//操作pool中的task链表,加锁w.pool.tasklock.lock()ifw.pool.taskhead!=nil{//拿到taskhead准备执行t=w.pool.taskhead//更新链表的head以及数量w.pool.taskhead=w.pool.taskhead.nextatomic.addint32(&w.pool.taskcount,-1)}//如果前一步拿到的taskhead为空,说明无任务需要执行,清理后返回ift==nil{w.close()w.pool.tasklock.unlock()w.recycle()return}w.pool.tasklock.unlock()//执行任务,针对panic会recover,并调用配置的handlerfunc(){deferfunc(){ifr:=recover();r!=nil{msg:=fmt.sprintf("gopool:panicinpool:%s:%v:%s",w.pool.name,r,debug.stack())logger.ctxerrorf(t.ctx,msg)ifw.pool.panichandler!=nil{w.pool.panichandler(t.ctx,r)}}}()t.f()}()t.recycle()}}()}

整体来看

看到这里,其实就能把整个流程串起来了。我们来看看对外的接口 ctxgo(context.context, f func()) 到底做了什么?

funcgo(ffunc()){ctxgo(context.background(),f)}funcctxgo(ctxcontext.context,ffunc()){defaultpool.ctxgo(ctx,f)}func(p*pool)ctxgo(ctxcontext.context,ffunc()){//创建一个task对象,将ctx和待执行的函数赋值t:=taskpool.get().(*task)t.ctx=ctxt.f=f//将task插入pool的链表的尾部,更新链表数量p.tasklock.lock()ifp.taskhead==nil{p.taskhead=tp.tasktail=t}else{p.tasktail.next=tp.tasktail=t}p.tasklock.unlock()atomic.addint32(&p.taskcount,1)//以下两个条件满足时,创建新的worker并唤起执行://1.task的数量超过了配置的限制//2.当前运行的worker数量小于上限(或无worker运行)if(atomic.loadint32(&p.taskcount)>=p.config.scalethreshold&&p.workercount()

相信看了代码注释,大家就能理解发生了什么。

gopool 会自行维护一个 defaultpool,这是一个默认的 pool 结构体,在引入包的时候就进行初始化。当我们直接调用 gopool.ctxgo() 时,本质上是调用了 defaultpool 的同名方法

funcinit(){defaultpool=newpool("gopool.defaultpool",10000,newconfig())}const(defaultscalathreshold=1)//configisusedtoconfigpool.typeconfigstruct{//控制扩容的门槛,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的workerscalethresholdint32}//newconfigcreatesadefaultconfig.funcnewconfig()*config{c:=&config{scalethreshold:defaultscalathreshold,}returnc}

defaultpool 的名称为 gopool.defaultpool,池子容量一万,扩容下限为 1。

当我们调用 ctxgo时,gopool 就会更新维护的任务链表,并且判断是否需要扩容 worker

  • 若此时已经有很多 worker 启动(底层一个 worker 对应一个 goroutine),不需要扩容,就直接返回。

  • 若判断需要扩容,就创建一个新的worker,并调用 worker.run()方法启动,各个worker会异步地检查 pool 里面的任务链表是否还有待执行的任务,如果有就执行。

三个角色的定位

  • task 是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构;

  • worker 是一个实际执行任务的执行器,它会异步启动一个 goroutine 执行协程池里面未执行的task

  • pool 是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的 worker

使用 sync.pool 进行性能优化

其实到这个地方,gopool已经是一个代码简洁清晰的协程池库了,但是性能上显然有改进空间,所以gopool的作者应用了多次 sync.pool 来池化对象的创建,复用woker和task对象。

这里建议大家直接看源码,其实在上面的代码中已经有所涉及。

  • task 池化

vartaskpoolsync.poolfuncinit(){taskpool.new=newtask}funcnewtask()interface{}{return&task{}}func(t*task)recycle(){t.zero()taskpool.put(t)}

  • worker 池化

varworkerpoolsync.poolfuncinit(){workerpool.new=newworker}funcnewworker()interface{}{return&worker{}}func(w*worker)recycle(){w.zero()workerpool.put(w)}

到此,关于“golang协程池gopool怎么设计与实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注恰卡编程网网站,小编会继续努力为大家带来更多实用的文章!

展开全文
内容来源于互联网和用户投稿,文章中一旦含有米乐app官网登录的联系方式务必识别真假,本站仅做信息展示不承担任何相关责任,如有侵权或涉及法律问题请联系米乐app官网登录删除

最新文章

网站地图