0. 将任务按不同的类型分开,例如
DiskIO,NetworkIO,CPU,能够有效地提升性能。
假设一个循环的任务由
1)下载文件、2)解压缩、3)保存到硬盘三个部分组成,总计运行 N 次。
如果one by one地做完三件事,总时间是 N * (t1 + t2 + t3)
如果组成流水线,总时间缩短到 N * min(t1, t2, t3)
1.
每个阶段从它的上级获取输入channel,如果上级关闭了channel(操作完成或发生了异常),它应当立即结束。
1 2 3 4 5 6 7
| func Stage(input <-chan string) { go func() { for path := range paths { } } }
|
2.
每个阶段自己创建输出channel,并保证操作完成或发生异常时关闭它,这通常通过
defer
来完成
1 2 3 4 5 6 7 8 9 10
| func Stage(input <-chan string) (<-chan string) { output := make(chan string, 5) go func() { defer close(output) for path := range paths { } } return output }
|
3.
为了处理突发的终止,例如超时或某个阶段发生异常,需要利用额外的channel来广播“取消”指令,这通常用
ctx.Done()
来完成
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| func Stage(ctx context.Context, input <-chan string) (<-chan string) { output := make(chan string, 5) go func() { defer close(output) for path := range paths { select { case <-ctx.Done(): return default: } } } return output }
|
4. 所有可能产生 error 的阶段,都需要返回一个额外的 channel
来标明:它是因为异常退出还是任务已完成,可以用
chan error
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) { output := make(chan string, 5) errc := make(chan error, 1) go func() { defer close(output) for path := range paths { select { case <-ctx.Done(): return default: } } } return output, errc }
|
5. 每个阶段应尽力监听
ctx.Done()
,一旦这个channel被关闭,这个阶段要尽快完成剩余的工作,并通过上一条中提到的
chan err
返回一个 ctx.Err()
的错误。通常至少在向输出管道发送消息时同时检查
ctx.Done()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) { output := make(chan string, 5) errc := make(chan error, 1) go func() { defer close(output) for path := range paths { select { case <-ctx.Done(): errc <- ctx.Err() return case output <- x: } } } return output, errc }
|
6. 在主线程中组装流水线,并阻塞在各个阶段的chan
error,保证只有当所有阶段都退出时才会返回。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| paths, errcScan := Scan(ctx, root) readers, errcRead := Read(ctx, paths) report, errcMerge := Merge(ctx, readers) if err := Wait(ctx, errcScan, errcRead, errcMerge); err != nil { return nil, err } return <-report, nil func Wait(ctx context.Context, cancel context.CancelFunc, errcs ...<-chan error) error { errs := make([]error, len(errcs)) var wg sync.WaitGroup wg.Add(len(errcs)) for index, errc := range errcs { go func(index int, errc <-chan error) { defer wg.Done() err := <-errc if err != nil && err != ctx.Err() { cancel() } errs[index] = err }(index, errc) } wg.Wait() for _, err := range errs { if err != nil && err != ctx.Err() { return err } } return errs[0] }
|
7.你需要一个通用且优雅的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func Stage(ctx context.Context, inputc <-chan *InputItem) (<-chan *OutputItem, <-chan error) { outputc := make(chan *OutputItem, BUFFER_SIZE) errc := make(chan error, 1)
doStage := func(input *InputItem) (*OutputItem, error) { } go func() { defer close(outputc) errc <- func() error { for input := range inputc { output, err := doStage(input) if err != nil { return err } select { case outputc <- output: case <-ctx.Done(): return ctx.Err() } } return nil }() }() return outputc, errc }
|
完整的例子:读取文件、解压缩、写入同一个文件
References
- Go语言并发模型:像Unix
Pipe那样使用channel
- Go Blog: Go Concurrency
Patterns: Pipelines and cancellation