0. 将任务按不同的类型分开,例如 DiskIO,NetworkIO,CPU,能够有效地提升性能。
假设一个循环的任务由 1)下载文件、2)解压缩、3)保存到硬盘三个部分组成,总计运行 N 次。
如果one by one地做完三件事,总时间是 N * (t1 + t2 + t3)
如果组成流水线,总时间缩短到 N * min(t1, t2, t3)
1. 每个阶段从它的上级获取输入channel,如果上级关闭了channel(操作完成或发生了异常),它应当立即结束。
1 | func Stage(input <-chan string) { |
2. 每个阶段自己创建输出channel,并保证操作完成或发生异常时关闭它,这通常通过 defer
来完成
1 | func Stage(input <-chan string) (<-chan string) { |
3. 为了处理突发的终止,例如超时或某个阶段发生异常,需要利用额外的channel来广播“取消”指令,这通常用 ctx.Done()
来完成
1 | func Stage(ctx context.Context, input <-chan string) (<-chan string) { |
4. 所有可能产生 error 的阶段,都需要返回一个额外的 channel 来标明:它是因为异常退出还是任务已完成,可以用 chan error
1 | func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) { |
5. 每个阶段应尽力监听 ctx.Done()
,一旦这个channel被关闭,这个阶段要尽快完成剩余的工作,并通过上一条中提到的 chan err
返回一个 ctx.Err()
的错误。通常至少在向输出管道发送消息时同时检查 ctx.Done()
1 | func Stage(ctx context.Context, input <-chan string) (<-chan string, <-chan error) { |
6. 在主线程中组装流水线,并阻塞在各个阶段的chan error,保证只有当所有阶段都退出时才会返回。
1 | paths, errcScan := Scan(ctx, root) |
7.你需要一个通用且优雅的实现
1 | func Stage(ctx context.Context, inputc <-chan *InputItem) (<-chan *OutputItem, <-chan error) { |