用 goroutine 和 channel 搭建流水线

0. 将任务按不同的类型分开,例如 DiskIO,NetworkIO,CPU,能够有效地提升性能。

假设一个循环的任务由 1)下载文件、2)解压缩、3)保存到硬盘三个部分组成,总计运行 N 次。

如果one by one地做完三件事,总时间是 N * (t1 + t2 + t3) 如果组成流水线,总时间缩短到 N * min(t1, t2, t3)

1. 每个阶段从它的上级获取输入channel,如果上级关闭了channel(操作完成或发生了异常),它应当立即结束。

1
2
3
4
5
6
7
func Stage(input <-chan string) {  
go func() {
for path := range paths {
// do something...
}
}
}

2. 每个阶段自己创建输出channel,并保证操作完成或发生异常时关闭它,这通常通过 defer 来完成

1
2
3
4
5
6
7
8
9
10
func Stage(input <-chan string) (<-chan string) {  
output := make(chan string, 5)
go func() {
defer close(output)
for path := range paths {
// do something...
}
}
return output
}

3. 为了处理突发的终止,例如超时或某个阶段发生异常,需要利用额外的channel来广播“取消”指令,这通常用 ctx.Done() 来完成

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func Stage(ctx context.Context, input <-chan string) (<-chan string) {  
output := make(chan string, 5)
go func() {
defer close(output)
for path := range paths {
select {
case <-ctx.Done():
return
default:
// do something...
}
}
}
return output
}

4. 所有可能产生 error 的阶段,都需要返回一个额外的 channel 来标明:它是因为异常退出还是任务已完成,可以用 chan error

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) {  
output := make(chan string, 5)
errc := make(chan error, 1)
go func() {
defer close(output)
for path := range paths {
select {
case <-ctx.Done():
return
default:
// do something...
}
}
}
return output, errc
}

5. 每个阶段应尽力监听 ctx.Done(),一旦这个channel被关闭,这个阶段要尽快完成剩余的工作,并通过上一条中提到的 chan err 返回一个 ctx.Err() 的错误。通常至少在向输出管道发送消息时同时检查 ctx.Done()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) {  
output := make(chan string, 5)
errc := make(chan error, 1)
go func() {
defer close(output)
for path := range paths {
// do something...
select {
case <-ctx.Done():
errc <- ctx.Err()
return
case output <- x:
}
}
}
return output, errc
}

6. 在主线程中组装流水线,并阻塞在各个阶段的chan error,保证只有当所有阶段都退出时才会返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
paths, errcScan := Scan(ctx, root)  
readers, errcRead := Read(ctx, paths)
report, errcMerge := Merge(ctx, readers)
if err := Wait(ctx, errcScan, errcRead, errcMerge); err != nil {
return nil, err
}
return <-report, nil
func Wait(ctx context.Context, cancel context.CancelFunc, errcs ...<-chan error) error {
errs := make([]error, len(errcs))
var wg sync.WaitGroup
wg.Add(len(errcs))
for index, errc := range errcs {
go func(index int, errc <-chan error) {
defer wg.Done()
err := <-errc
if err != nil && err != ctx.Err() {
cancel() // notify all to stop
}
errs[index] = err
}(index, errc)
}
wg.Wait()
for _, err := range errs {
if err != nil && err != ctx.Err() {
return err
}
}
return errs[0]
}

7.你需要一个通用且优雅的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func Stage(ctx context.Context, inputc <-chan *InputItem) (<-chan *OutputItem, <-chan error) {  
outputc := make(chan *OutputItem, BUFFER_SIZE)
errc := make(chan error, 1)

doStage := func(input *InputItem) (*OutputItem, error) {
// Do something just like in a regular function
}
go func() {
defer close(outputc)
errc <- func() error {
for input := range inputc {
output, err := doStage(input)
if err != nil {
return err
}
select {
case outputc <- output:
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}()
}()
return outputc, errc
}

完整的例子:读取文件、解压缩、写入同一个文件

References

  1. Go语言并发模型:像Unix Pipe那样使用channel
  2. Go Blog: Go Concurrency Patterns: Pipelines and cancellation