基本介绍
经测试,goroutine
池对于业务逻辑的执行效率(降低执行时间/CPU使用率)提升不大,甚至没有原生的goroutine
执行快速(池化goroutine
执行调度并没有底层go调度器高效,因为池化goroutine
的执行调度也是基于底层go调度器),但是由于采用了复用的设计,池化后对内存的使用率得到极大的降低。在v2
版本中grpool
也加入了贯穿全局的链路追踪。
概念:
Pool
:goroutine
池,用于管理若干可复用的goroutine
协程资源;Worker
:池对象中参与任务执行的goroutine
,一个Worker
可以执行若干个Job
,直到队列中再无等待的Job
;Job
:添加到池对象的任务队列中等待执行的任务,是一个func()
的方法,一个Job
同时只能被一个Worker
获取并执行;
使用方式:
使用场景:
管理大量异步任务的场景、需要异步协程复用的场景、需要降低内存使用率的场景。
接口文档:
func Add(f func()) error
func Jobs() int
func Size() int
type Pool
func New(limit ...int) *Pool
func (p *Pool) Add(ctx context.Context, f Func) error
func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error
func (p *Pool) Cap() int
func (p *Pool) Close()
func (p *Pool) IsClosed() bool
func (p *Pool) Jobs() int
func (p *Pool) Size() int
通过grpool.New
方法创建一个goroutine池
对象,参数limit
为非必需参数,用于限定池中的工作goroutine
数量,默认为不限制。需要注意的是,任务可以不停地往池中添加,没有限制,但是工作的goroutine
是可以做限制的。我们可以通过Size()
方法查询当前的工作goroutine
数量,使用Jobs()
方法查询当前池中待处理的任务数量。
这个模块大家问得最多的是外部如何给grpool
里面的任务传递参数,具体请看示例2。
使用示例
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtimer"
"time"
)
var (
ctx = gctx.New()
)
func job(ctx context.Context) {
time.Sleep(1*time.Second)
}
func main() {
pool := grpool.New(100)
for i := 0; i < 1000; i++ {
pool.Add(ctx,job)
}
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
gtimer.SetInterval(ctx,time.Second, func(ctx context.Context) {
fmt.Println("worker:", pool.Size())
fmt.Println()
})
select {}
}
这段程序中的任务函数的功能是sleep 1秒钟
,这样便能充分展示出goroutine数量限制功能。其中,我们使用了gtime.SetInterval
定时器每隔1秒钟打印出当前默认池中的工作goroutine
数量以及待处理的任务数量。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"sync"
)
var (
ctx = gctx.New()
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
grpool.Add(ctx,func(ctx context.Context) {
fmt.Println(i)
wg.Done()
})
}
wg.Wait()
}
我们这段代码的目的是要顺序地打印出0-9,然而运行后却输出:
为什么呢?这里的执行结果无论是采用go
关键字来执行还是grpool
来执行都是如此。原因是,对于异步线程/协程来讲,函数进行异步执行注册时,该函数并未真正开始执行(注册时只在goroutine
的栈中保存了变量i
的内存地址),而一旦开始执行时函数才会去读取变量i
的值,而这个时候变量i
的值已经自增到了10
。 清楚原因之后,改进方案也很简单了,就是在注册异步执行函数的时候,把当时变量i
的值也一并传递获取;或者把当前变量i的值赋值给一个不会改变的临时变量,在函数中使用该临时变量而不是直接使用变量i
。
改进后的示例代码如下:
1)、使用go关键字
package main
import (
"fmt"
"sync"
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int){
fmt.Println(v)
wg.Done()
}(i)
}
wg.Wait()
}
执行后,输出结果为:
0
9
3
4
5
6
7
8
1
2
2)、使用临时变量
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"sync"
)
var (
ctx = gctx.New()
)
func main() {
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
v := i
grpool.Add(ctx, func(ctx context.Context) {
fmt.Println(v)
wg.Done()
})
}
wg.Wait()
}
执行后,输出结果为:
这里可以看到,使用grpool
进行任务注册时,注册方法为,因此无法在任务注册时把变量i
的值注册进去(请尽量不要通过ctx
传递业务参数),因此只能采用临时变量的形式来传递当前变量i
的值。
AddWithRecover
将新作业推送到具有指定恢复功能的池中。当userFunc
执行过程中出现panic
时,会调用可选的Recovery Func
。如果没有传入Recovery Func
或赋空,则忽略userFunc
引发的panic
。该作业将异步执行。
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/container/garray"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"time"
)
var (
ctx = gctx.New()
)
func main() {
array := garray.NewArray(true)
grpool.AddWithRecover(ctx, func(ctx context.Context) {
array.Append(1)
array.Append(2)
panic(1)
}, func(err error) {
array.Append(1)
})
grpool.AddWithRecover(ctx, func(ctx context.Context) {
panic(1)
array.Append(1)
})
time.Sleep(500 * time.Millisecond)
fmt.Print(array.Len())
}
1)、grpool
package main
import (
"context"
"fmt"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/grpool"
"github.com/gogf/gf/v2/os/gtime"
"sync"
"time"
)
var (
ctx = gctx.New()
)
func main() {
start := gtime.TimestampMilli()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
wg.Add(1)
grpool.Add(ctx,func(ctx context.Context) {
time.Sleep(time.Millisecond)
wg.Done()
})
}
wg.Wait()
fmt.Println(grpool.Size())
fmt.Println("time spent:", gtime.TimestampMilli() - start)
}
2)、goroutine
package main
import (
"fmt"
"github.com/gogf/gf/v2/os/gtime"
"sync"
"time"
)
func main() {
start := gtime.TimestampMilli()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
wg.Done()
}()
}
wg.Wait()
fmt.Println("time spent:", gtime.TimestampMilli() - start)
3)、运行结果比较
可以看到池化过后,执行相同数量的任务,数量减少很多,相对的内存也降低了一倍以上,CPU时间耗时也勉强可以接受。