上篇文章中提到,Java 里把 Promise 叫作 CompletableFuture,相比那个只能用于线程同步的 Future,它新增了很多方法用于串联异步事件,比如常用的 thenApply
、thenCompose
、thenAccept
等。
如果不引入任何第三方库,CompletableFuture 仍是目前 Java 上最好的异步编程方式。之前一直觉得这个东西难用,直到我想明白一件事,证明了 CompletableFuture 虽然麻烦了点但是能做任何事情,然后用它的时候心里就没那么膈应了。
本文会以一个例子来讲解:如何把任意函数转换成异步调用风格。其实不一定要用 CompletableFuture,任何语言和框架都是适用的。
这篇文章不会涉及 CompletableFuture 的用法,你可以参考 Javadoc 或者这篇文章。
证明 CompletableFuture 是足够的
首先来(极不严谨地)说明一件事情,为什么 CompletableFuture 是足够用的,换句话说,证明 CompletableFuture 能表达一切计算流程。
如果你有一些函数式编程的基础,比如会一点 Haskell,这就是一句话的事情:CompletableFuture 其实是一个 Monad —— 因为它的 thenCompose
实现了 Monad 的 >>=
操作符。既然 Monad 能用来表示任何计算过程,CompletableFuture 当然也能。
1 | class Applicative m => Monad (m :: * -> *) where |
其实想想也很明白,Monad 表示一个带 context 的计算过程,比如可能抛异常之类的(纯函数是不会抛异常的)。CompletableFuture 也一样,他包裹一串计算过程并且处理异常。
如果看不懂上面的也没关系,我们用另一种方式再说明一下:
任何程序的流程控制都可以用 if
和 goto
来组合起来。无论是 for
还是 while
循环,desurge 之后不过就是 if
和 goto
的组合。通过 thenCompose
就可以表达 if
和 goto
:
这里说的不够严谨,其实 if 也是 surge,最终会变成条件跳转指令。
1 | cf.thenCompose(v -> { |
你看这个例子,if
和 goto
都有了,所以无论程序的控制流多复杂,我们都能组合出来。怎么组合?别急,下面我们就来讲这个。
CompletableFuture in Practice
我们从一个普通的函数开始。考虑到复杂性和完整性,我们用 Merge 2 Sorted Streams 作为演示,如果你不清楚这个是干嘛的,可以先做一下这道算法题。
下面是最普通的实现,输入两个数组,输出一个数组:
1 | Stream merge(Stream inputA, Stream inputB) { |
这个实现有什么问题呢?作为算法足够 OK。但是从工程意义上说,如果输入的 Stream 很大,包含 million 级的元素,那更好的方式是把 Stream 的输入输出作为 Iterator,只在 next()
的时候计算下一个需要的元素。这样内存占用是常数级的,完全不用担心数据量过大呢!
为了看清一步一步的变化过程,我们先假装 Java 有 Generator 语法。标记为 Generator 的函数不再是一个函数,而是类似一个 Iterator。一旦调用 next()
,“函数”代码运行到 yield
返回一个值,然后函数似乎停在了这里。下次 next()
,“函数”又接着刚刚的地方运行。
如果有 Generator 的话,函数应该长下面这样,注意 [yield]
:
1 | Stream merge(Stream inputA, Stream inputB) { |
哇,这个函数几乎没有改动,真是太方便了!(然而并没有卵用)
Function → Iterator
现在我们回到现实:Java 并没有 Generator 语法,所以我们要人肉实现一个 Generator。
为了通用性,首先做一个 desurge,把 while 循环改成 if
和 goto
的组合,这太简单了:
1 | Stream merge(Stream inputA, Stream inputB) { |
下一步是去掉 yield
,刚刚说到 Generator 的每次 next()
似乎会让函数停在一个地方,如何实现停在一个地方?记下来呗!加一个标记状态的变量,这个状态会告诉我下次 next()
的时候从哪里继续运行。
首先画出函数的控制流图,然后做一件事:想象所有的 yield
之后都有一个断点,我们在断点处切开,标记它为某个 State,这样下次 next()
的时候就能从断点继续。
下图的 S0 ~ S2 是我标记好的断点,S0 就是起始位置,S1 是两个 yield result
之后断下来的地方(恰好是同一个地方),S2 是 yield null
之后断下来的地方。
我们按照图中的 State
标记机械地把它切开,就得到了下面这个类,它就是由 merge()
变换得到的 Generator:
1 | class Merger implements Iterator<Integer> { |
别急,最后我们会简化这些充满废话的代码。
阶段性总结一下:到现在为止,我们做了一件伟大的事情——把一个函数变成了 Iterator,函数已经不再是函数,而是一个状态机,这个状态记录了下次调用 next()
需要从哪继续。
套用一下术语:“从哪继续”就是 Continuation,把 Continuation 搞出来的这个过程称为 CPS 变换。
Iterator → AsyncIterator
呃…… 说好的 CompletableFuture 呢?离 CompletableFuture 只有一步之遥了!
先从接口下手。想象两个 Stream Input 都是从 IO 拿到的数据,所以每次 next()
其实背后都是一次 IO,应该把它用 CompletableFuture 包成异步的,接口大概长这样:
1 | interface AsyncIterator<T> { |
类似刚刚引入 Generator 一样,我们再假装有 await
关键字。await
关键字表示异步地等待结果返回,有了它,函数就魔法般的暂停在等待异步 IO 的地方:
1 | Stream merge(Stream inputA, Stream inputB) { |
因为 await
也会暂停这个“函数”,所以和刚刚对 yield
的处理一样,我们想象 await
这里有一个断点,我们也要为它设置 State 标记:
糟糕!这状态数有点多啊!好在 Java 8 提供了 Lambda 表达式,和 CompletableFuture 搭配食用口味更佳。图中的大多数状态都可以借助 Lambda 表达式来实现,节约了不少代码:
1 | class Merger implements AsyncIterator<Integer> { |
Refinement
上面我们只用了 thenCompose
,理论上这是 OK 的,但是实际上 CompletableFuture 有上百个方法,最合适的才是坠吼的。
- 如果仅仅是返回一个值(而非阶段),可以用
thenApply
; thenCombine
等待两个 CompletableFuture 都完成了再去调用 BiFunction(T, U) -> R
来消费。
思考题:有兴趣的读者可以思考一下
thenCombine
的实现。
整理一下上面的代码,比如这样:
1 | class Merger implements AsyncIterator<Integer> { |
总结
任何函数都可以用 CompletableFuture 实现异步化,最通用的方式如下:
- 在函数里加上
yield
(返回下一个结果)和await
(等待输入值)来标记断点; - 画出控制流图,注意要在
yield
和await
处断开,断开处标记为状态; - 实现一个状态机类,把控制流图中的代码块、状态都无脑填进去,搞定。
这一刻,我们都是(人肉)编译器。