基本介绍

经测试,goroutine池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine执行快速(池化goroutine执行调度并没有底层go调度器高效,因为池化goroutine的执行调度也是基于底层go调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。在v2版本中grpool也加入了贯穿全局的链路追踪。

概念:

  1. Poolgoroutine池,用于管理若干可复用的goroutine协程资源;
  2. Worker:池对象中参与任务执行的goroutine,一个Worker可以执行若干个Job,直到队列中再无等待的Job
  3. Job:添加到池对象的任务队列中等待执行的任务,是一个func()的方法,一个Job同时只能被一个Worker获取并执行;

使用方式

使用场景

管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。

接口文档

  1. func Add(f func()) error
  2. func Jobs() int
  3. func Size() int
  4. type Pool
  5. func New(limit ...int) *Pool
  6. func (p *Pool) Add(ctx context.Context, f Func) error
  7. func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error
  8. func (p *Pool) Cap() int
  9. func (p *Pool) Close()
  10. func (p *Pool) IsClosed() bool
  11. func (p *Pool) Jobs() int
  12. func (p *Pool) Size() int

通过grpool.New方法创建一个goroutine池对象,参数limit为非必需参数,用于限定池中的工作goroutine数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine是可以做限制的。我们可以通过Size()方法查询当前的工作goroutine数量,使用Jobs()方法查询当前池中待处理的任务数量。

这个模块大家问得最多的是外部如何给grpool里面的任务传递参数,具体请看示例2。

使用示例

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "github.com/gogf/gf/v2/os/gtimer"
  8. "time"
  9. )
  10. var (
  11. ctx = gctx.New()
  12. )
  13. func job(ctx context.Context) {
  14. time.Sleep(1*time.Second)
  15. }
  16. func main() {
  17. pool := grpool.New(100)
  18. for i := 0; i < 1000; i++ {
  19. pool.Add(ctx,job)
  20. }
  21. fmt.Println("worker:", pool.Size())
  22. fmt.Println(" jobs:", pool.Jobs())
  23. gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) {
  24. fmt.Println("worker:", pool.Size())
  25. fmt.Println()
  26. })
  27. select {}
  28. }

这段程序中的任务函数的功能是sleep 1秒钟,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval定时器每隔1秒钟打印出当前默认池中的工作goroutine数量以及待处理的任务数量。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "sync"
  8. )
  9. var (
  10. ctx = gctx.New()
  11. )
  12. func main() {
  13. wg := sync.WaitGroup{}
  14. for i := 0; i < 10; i++ {
  15. grpool.Add(ctx,func(ctx context.Context) {
  16. fmt.Println(i)
  17. wg.Done()
  18. })
  19. }
  20. wg.Wait()
  21. }

我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:

为什么呢?这里的执行结果无论是采用go关键字来执行还是grpool来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(注册时只在goroutine的栈中保存了变量i的内存地址),而一旦开始执行时函数才会去读取变量i的值,而这个时候变量i的值已经自增到了10。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i

改进后的示例代码如下:

1)、使用go关键字

  1. package main
  2. import (
  3. "fmt"
  4. "sync"
  5. )
  6. func main() {
  7. wg := sync.WaitGroup{}
  8. for i := 0; i < 10; i++ {
  9. wg.Add(1)
  10. go func(v int){
  11. fmt.Println(v)
  12. wg.Done()
  13. }(i)
  14. }
  15. wg.Wait()
  16. }

执行后,输出结果为:

  1. 0
  2. 9
  3. 3
  4. 4
  5. 5
  6. 6
  7. 7
  8. 8
  9. 1
  10. 2

2)、使用临时变量

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "sync"
  8. )
  9. var (
  10. ctx = gctx.New()
  11. )
  12. func main() {
  13. wg := sync.WaitGroup{}
  14. for i := 0; i < 10; i++ {
  15. wg.Add(1)
  16. v := i
  17. grpool.Add(ctx, func(ctx context.Context) {
  18. fmt.Println(v)
  19. wg.Done()
  20. })
  21. }
  22. wg.Wait()
  23. }

执行后,输出结果为:

这里可以看到,使用grpool进行任务注册时,注册方法为,因此无法在任务注册时把变量i的值注册进去(请尽量不要通过ctx传递业务参数),因此只能采用临时变量的形式来传递当前变量i的值。

AddWithRecover将新作业推送到具有指定恢复功能的池中。当userFunc执行过程中出现panic时,会调用可选的Recovery Func。如果没有传入Recovery Func或赋空,则忽略userFunc引发的panic。该作业将异步执行。

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/container/garray"
  6. "github.com/gogf/gf/v2/os/gctx"
  7. "github.com/gogf/gf/v2/os/grpool"
  8. "time"
  9. )
  10. var (
  11. ctx = gctx.New()
  12. )
  13. func main() {
  14. array := garray.NewArray(true)
  15. grpool.AddWithRecover(ctx, func(ctx context.Context) {
  16. array.Append(1)
  17. array.Append(2)
  18. panic(1)
  19. }, func(err error) {
  20. array.Append(1)
  21. })
  22. grpool.AddWithRecover(ctx, func(ctx context.Context) {
  23. panic(1)
  24. array.Append(1)
  25. })
  26. time.Sleep(500 * time.Millisecond)
  27. fmt.Print(array.Len())
  28. }

1)、grpool

  1. package main
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/gogf/gf/v2/os/gctx"
  6. "github.com/gogf/gf/v2/os/grpool"
  7. "github.com/gogf/gf/v2/os/gtime"
  8. "sync"
  9. "time"
  10. )
  11. var (
  12. ctx = gctx.New()
  13. )
  14. func main() {
  15. start := gtime.TimestampMilli()
  16. wg := sync.WaitGroup{}
  17. for i := 0; i < 10000000; i++ {
  18. wg.Add(1)
  19. grpool.Add(ctx,func(ctx context.Context) {
  20. time.Sleep(time.Millisecond)
  21. wg.Done()
  22. })
  23. }
  24. wg.Wait()
  25. fmt.Println(grpool.Size())
  26. fmt.Println("time spent:", gtime.TimestampMilli() - start)
  27. }

2)、goroutine

  1. package main
  2. import (
  3. "fmt"
  4. "github.com/gogf/gf/v2/os/gtime"
  5. "sync"
  6. "time"
  7. )
  8. func main() {
  9. start := gtime.TimestampMilli()
  10. wg := sync.WaitGroup{}
  11. for i := 0; i < 10000000; i++ {
  12. wg.Add(1)
  13. go func() {
  14. time.Sleep(time.Millisecond)
  15. wg.Done()
  16. }()
  17. }
  18. wg.Wait()
  19. fmt.Println("time spent:", gtime.TimestampMilli() - start)

3)、运行结果比较

可以看到池化过后,执行相同数量的任务,数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。