上篇文章 中提到,Java 里把 Promise
叫作 CompletableFuture,相比那个只能用于线程同步的
Future,它新增了很多方法用于串联异步事件,比如常用的
thenApply
、thenCompose
、thenAccept
等。
如果不引入任何第三方库,CompletableFuture 仍是目前 Java
上最好的异步编程方式。之前一直觉得这个东西难用,直到我想明白一件事,证明了
CompletableFuture
虽然麻烦了点但是能做任何事情,然后用它的时候心里就没那么膈应了。
本文会以一个例子来讲解:如何把任意函数转换成异步调用风格 。其实不一定要用
CompletableFuture,任何语言和框架都是适用的。
这篇文章不会涉及 CompletableFuture 的用法,你可以参考 Javadoc 或者这篇文章 。
证明 CompletableFuture
是足够的
首先来(极不严谨地)说明一件事情,为什么 CompletableFuture
是足够用的 ,换句话说,证明 CompletableFuture
能表达一切计算流程 。
如果你有一些函数式编程的基础,比如会一点
Haskell,这就是一句话的事情:CompletableFuture 其实是一个 Monad ——
因为它的 thenCompose
实现了 Monad 的 >>=
操作符。既然 Monad 能用来表示任何计算过程,CompletableFuture
当然也能。
1 2 3 4 5 6 class Applicative m => Monad (m :: * -> *) where (>>=) :: m a -> (a -> m b) -> m b (>>) :: m a -> m b -> m b return :: a -> m a fail :: String -> m a {-# MINIMAL (>>=) #-}
其实想想也很明白,Monad 表示一个带 context
的计算过程,比如可能抛异常之类的(纯函数是不会抛异常的)。CompletableFuture
也一样,他包裹一串计算过程并且处理异常。
如果看不懂上面的也没关系,我们用另一种方式再说明一下:
任何程序的流程控制都可以用 if
和 goto
来组合起来。无论是 for
还是 while
循环,desurge 之后不过就是 if
和 goto
的组合。通过 thenCompose
就可以表达 if
和 goto
:
这里说的不够严谨,其实 if 也是 surge,最终会变成条件跳转指令。
1 2 3 4 5 6 7 cf.thenCompose(v -> { if (v < 100 ) { return doStage1(); } else { return doStage2(); } })
你看这个例子,if
和 goto
都有了,所以无论程序的控制流多复杂,我们都能组合出来。怎么组合?别急,下面我们就来讲这个。
CompletableFuture in
Practice
我们从一个普通的函数开始。考虑到复杂性和完整性,我们用 Merge 2
Sorted Streams 作为演示,如果你不清楚这个是干嘛的,可以先做一下这道算法题 。
下面是最普通的实现,输入两个数组,输出一个数组:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 Stream merge (Stream inputA, Stream inputB) { List<Integer> results = new ArrayList <>(); Integer headA = inputA.next(); Integer headB = inputB.next(); while (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { results.add(headB); headB = inputB.next(); } else { results.add(headA); headA = inputA.next(); } } return new Stream (results); } class Stream { private final Queue<Integer> numbers; public Stream (List<Integer> numbers) { this .numbers = new LinkedList <>(numbers); } public Integer next () { return numbers.poll(); } }
这个实现有什么问题呢?作为算法足够 OK。但是从工程意义上说,如果输入的
Stream 很大,包含 million 级的元素,那更好的方式是把 Stream
的输入输出作为 Iterator,只在 next()
的时候计算下一个需要的元素。这样内存占用是常数级的,完全不用担心数据量过大呢!
为了看清一步一步的变化过程,我们先假装 Java 有 Generator
语法 。标记为 Generator 的函数不再是一个函数,而是类似一个
Iterator。一旦调用 next()
,“函数”代码运行到
yield
返回一个值,然后函数似乎停在 了这里。下次
next()
,“函数”又接着刚刚的地方运行。
如果有 Generator 的话,函数应该长下面这样,注意
[yield]
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Stream merge (Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); while (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { [yield ] headB; headB = inputB.next(); } else { [yield ] headA; headA = inputA.next(); } } [yield ] null ; }
哇,这个函数几乎没有改动,真是太方便了!(然而并没有卵用)
Function → Iterator
现在我们回到现实:Java 并没有 Generator 语法,所以我们要人肉实现一个
Generator。
为了通用性,首先做一个 desurge,把 while 循环改成 if
和
goto
的组合,这太简单了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Stream merge (Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); WHILE_LOOP: if (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { [yield ] headB; headB = inputB.next(); } else { [yield ] headA; headA = inputA.next(); } goto WHILE_LOOP; } [yield ] null ; }
下一步是去掉 yield
,刚刚说到 Generator 的每次
next()
似乎会让函数停在 一个地方,如何实现停在 一个地方?记下来呗!加一个标记状态 的变量,这个状态会告诉我下次
next()
的时候从哪里继续运行。
首先画出函数的控制流图,然后做一件事:想象所有的 yield
之后都有一个断点,我们在断点处切开,标记它为某个 State,这样下次
next()
的时候就能从断点继续。
下图的 S0 ~ S2 是我标记好的断点,S0 就是起始位置,S1 是两个
yield result
之后断下来的地方(恰好是同一个地方),S2 是
yield null
之后断下来的地方。
我们按照图中的 State
标记机械地把它切开,就得到了下面这个类,它就是由 merge()
变换得到的 Generator:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 class Merger implements Iterator <Integer> { final Iterator inputA; final Iterator inputB; private int state = 0 ; private Integer headA; private Integer headB; public Merger (Iterator inputA, Iterator inputB) { this .inputA = inputA; this .inputB = inputB; } public Integer next () { for (;;) { switch (state) { case 0 : headA = inputA.next(); headB = inputB.next(); state = 1 ; break ; case 1 : if (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB ) { final int result = headB; headB = inputB.next(); state = 1 ; return result; } else { final int result = headA; headA = inputA.next(); state = 1 ; return result; } } else { state = 2 ; return null ; } case 2 : throw new IllegalStateException ("Generator has been exhausted!" ); default : throw new AssertionError ("Unreachable!" ); } } } }
别急,最后我们会简化这些充满废话的代码。
阶段性总结一下:到现在为止,我们做了一件伟大的事情——把一个函数变成了
Iterator,函数已经不再是函数,而是一个状态机,这个状态记录了下次调用
next()
需要从哪继续 。
套用一下术语:“从哪继续”就是 Continuation ,把
Continuation 搞出来的这个过程称为 CPS
变换 。
Iterator → AsyncIterator
呃…… 说好的 CompletableFuture 呢?离 CompletableFuture
只有一步之遥了!
先从接口下手。想象两个 Stream Input 都是从 IO 拿到的数据,所以每次
next()
其实背后都是一次 IO,应该把它用 CompletableFuture
包成异步的,接口大概长这样:
1 2 3 interface AsyncIterator <T> { CompletableFuture<T> next () ; }
类似刚刚引入 Generator 一样,我们再假装有 await
关键字。await
关键字表示异步地等待结果返回,有了它,函数就魔法般的暂停在等待异步 IO
的地方:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Stream merge (Stream inputA, Stream inputB) { Integer headA = inputA.next(); Integer headB = inputB.next(); WHILE_LOOP: if (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { Integer result = headB; headB = [await] inputB.next(); [yield ] result; } else { Integer result = headA; headA = [await] inputA.next(); [yield ] result; } goto WHILE_LOOP; } [yield ] null ; }
因为 await
也会暂停这个“函数”,所以和刚刚对
yield
的处理一样,我们想象 await
这里有一个断点,我们也要为它设置 State 标记:
糟糕!这状态数有点多啊!好在 Java 8 提供了 Lambda 表达式,和
CompletableFuture 搭配食用口味更佳。图中的大多数状态都可以借助 Lambda
表达式来实现,节约了不少代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class Merger implements AsyncIterator <Integer> { final Stream inputA; final Stream inputB; private int state = 0 ; private Integer headA; private Integer headB; public Merger (Stream inputA, Stream inputB) { this .inputA = inputA; this .inputB = inputB; } public CompletableFuture<Integer> next () { switch (state) { case 0 : return inputA.next().thenCompose(a -> { headA = a; return inputB.next(); }).thenCompose(b -> { headB = b; state = 3 ; return next(); }); case 3 : if (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { final Integer result = headB; return inputB.next().thenCompose(b -> { headB = b; state = 3 ; return CompletableFuture.completedFuture(result); }); } else { final Integer result = headA; return inputA.next().thenCompose(a -> { headA = a; state = 3 ; return CompletableFuture.completedFuture(result); }); } } else { state = 6 ; return CompletableFuture.completedFuture(null ); } case 6 : throw new IllegalStateException ("Generator has been exhausted!" ); default : throw new AssertionError ("Unreachable!" ); } } }
Refinement
上面我们只用了 thenCompose
,理论上这是 OK 的,但是实际上
CompletableFuture 有上百个方法,最合适的才是坠吼的。
如果仅仅是返回一个值(而非阶段),可以用
thenApply
;
thenCombine
等待两个 CompletableFuture 都完成了再去调用
BiFunction (T, U) -> R
来消费。
思考题:有兴趣的读者可以思考一下 thenCombine
的实现。
整理一下上面的代码,比如这样:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 class Merger implements AsyncIterator <Integer> { enum State { START, ITERATING, DONE } final Stream inputA; final Stream inputB; private State state = State.START; private Integer headA; private Integer headB; public Merger (Stream inputA, Stream inputB) { this .inputA = inputA; this .inputB = inputB; } private CompletableFuture<Integer> next () { switch (state) { case START: return inputA.next().thenCombine(inputB.next(), (a, b) -> { headA = a; headB = b; state = State.ITERATING; return (Void)null ; }).thenCompose(__ -> next()); case ITERATING: if (headA != null || headB != null ) { if (headA == null || headB != null && headA > headB) { final Integer result = headB; return inputB.next().thenApply(b -> { headB = b; return result; }); } else { final Integer result = headA; return inputA.next().thenApply(a -> { headA = a; return result; }); } } else { state = State.DONE; return CompletableFuture.completedFuture(null ); } case DONE: throw new IllegalStateException ("Generator has been exhausted!" ); default : throw new AssertionError ("Unreachable!" ); } } }
总结
任何函数都可以用 CompletableFuture 实现异步化,最通用的方式如下:
在函数里加上 yield
(返回下一个结果)和
await
(等待输入值)来标记断点;
画出控制流图,注意要在 yield
和 await
处断开,断开处标记为状态;
实现一个状态机类,把控制流图中的代码块、状态都无脑填进去,搞定。
这一刻,我们都是(人肉)编译器。