CompletableFuture 也没有那么废柴嘛!

我们知道,Java 里把 Promise 叫作 CompletableFuture,相比那个只能用于线程同步的 Future,CompletableFuture 新增了很多方法用于串联异步事件,比如常用的一些:

  • thenApply:拿到结果后对其 apply 一个函数,返回一个新的值 thenApply(T -> R)
  • thenCompose:拿到结果后其 apply 一个函数,返回一个新的 CompletableFuture thenCompose(T -> CompletableFuture<R>)
  • thenAccept:拿到结果后消费它,不需要返回结果 thenAccept(Consumer<T>)

如果不引入任何第三方库,CompletableFuture 仍是目前 Java 上最好的异步编程方式。之前一直觉得这个东西难用,直到我想明白一件事,证明了 CompletableFuture 虽然麻烦了点但是能做任何事情,然后用它的时候心里就没有这么膈应了。

本文会以一个例子来讲解:如何把任意函数转换成异步调用风格。用不用 CompletableFuture 倒是其次的,这项技术更多的是在于编程本身。

这篇文章不会谈论太多 CompletableFuture 的用法,你可以参考 Javadoc 或者这篇文章

证明 CompletableFuture 是足够的

首先来(极不严谨地)说明一件事情,为什么 CompletableFuture 是足够用的,换句话说,证明 CompletableFuture 能表达一切计算流程

如果你有一些函数式编程的基础,比如会一点 Haskell,这就是一句话的事情:CompletableFuture 其实是一个 Monad —— 因为它的 thenCompose 实现了 Monad 的 >>= 操作符。既然 Monad 能用来表示任何计算过程,CompletableFuture 当然也能。

1
2
3
4
5
6
class Applicative m => Monad (m :: * -> *) where
(>>=) :: m a -> (a -> m b) -> m b -- thenCompose 实现了它
(>>) :: m a -> m b -> m b
return :: a -> m a
fail :: String -> m a
{-# MINIMAL (>>=) #-} -- 这是在说:只要实现 (>>=) 就够了

其实想想也很明白,Monad 表示一个带 context 的计算过程,比如可能抛异常之类的(纯函数是不会抛异常的)。CompletableFuture 也一样,他包裹一串计算过程并且处理异常。

如果看不懂上面的也没关系,我们用另一种方式再说明一下:

任何程序的流程控制都可以用 ifgoto 来组合起来。无论是 for 还是 while 循环,desurge 之后不过就是 ifgoto 的组合。通过 thenCompose 就可以表达 ifgoto

这里说的不够严谨,其实 if 也是 surge,最终会变成条件跳转指令。

1
2
3
4
5
6
7
cf.thenCompose(v -> {
if (v < 100) {
return doStage1(); // doStage1() 返回一个 CompletableFuture,决定下一步做什么,相当于 goto
} else {
return doStage2(); // 同上
}
})

你看这个例子,ifgoto 都有了,所以无论程序的控制流多复杂,我们都能组合出来。怎么组合?别急,下面我们就来讲这个。

CompletableFuture in Practice

我们从一个普通的函数开始。考虑到复杂性和完整性,我们用 Merge 2 Sorted Streams 作为演示,如果你不清楚这个是干嘛的,可以先做一下这道算法题

下面是最普通的实现,输入两个数组,输出一个数组:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
Stream merge(Stream inputA, Stream inputB) {
List<Integer> results = new ArrayList<>();
Integer headA = inputA.next();
Integer headB = inputB.next();
while (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
results.add(headB);
headB = inputB.next();
} else {
results.add(headA);
headA = inputA.next();
}
}
return new Stream(results);
}

class Stream {
private final Queue<Integer> numbers;
public Stream(List<Integer> numbers) { this.numbers = new LinkedList<>(numbers); }
public Integer { return numbers.poll(); }
}

这个实现有什么问题呢?作为算法足够 OK。但是从工程意义上说,如果输入的 Stream 很大,包含 million 级的元素,那更好的方式是把 Stream 的输入输出作为 Iterator,只在 next() 的时候计算下一个需要的元素。这样内存占用是常数级的,完全不用担心数据量过大呢!

为了看清一步一步的变化过程,我们先假装 Java 有 Generator 语法。标记为 Generator 的函数不再是一个函数,而是类似一个 Iterator。一旦调用 next(),“函数”代码运行到 yield 返回一个值,然后函数似乎停在了这里。下次 next(),“函数”又接着刚刚的地方运行。

如果有 Generator 的话,函数应该长下面这样,注意 [yield]:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Stream merge(Stream inputA, Stream inputB) {
Integer headA = inputA.next();
Integer headB = inputB.next();
while (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
[yield] headB;
headB = inputB.next();
} else {
[yield] headA;
headA = inputA.next();
}
}
[yield] null;
}

哇,这个函数几乎没有改动,真是太方便了!(然而并没有卵用)

Function → Iterator

现在我们回到现实:Java 并没有 Generator 语法,所以我们要人肉实现一个 Generator。

为了通用性,首先做一个 desurge,把 while 循环改成 ifgoto 的组合,这太简单了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Stream merge(Stream inputA, Stream inputB) {
Integer headA = inputA.next();
Integer headB = inputB.next();
WHILE_LOOP:
if (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
[yield] headB;
headB = inputB.next();
} else {
[yield] headA;
headA = inputA.next();
}
goto WHILE_LOOP; // again,假设 Java 也有 goto
}
[yield] null;
}

下一步是去掉 yield,刚刚说到 Generator 的每次 next() 似乎会让函数停在一个地方,如何实现停在一个地方?记下来呗!加一个标记状态的变量,这个状态会告诉我下次 next() 的时候从哪里继续运行。

首先画出函数的控制流图,然后做一件事:想象所有的 yield 之后都有一个断点,我们在断点处切开,标记它为某个 State,这样下次 next() 的时候就能从断点继续。

下图的 S0 ~ S2 是我标记好的断点,S0 就是起始位置,S1 是两个 yield result 之后断下来的地方(恰好是同一个地方),S2 是 yield null 之后断下来的地方。

flow-graph -1-

我们按照图中的 State 标记机械地把它切开,就得到了下面这个类,它就是由 merge() 变换得到的 Generator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class Merger implements Iterator<Integer> { 
// Arguments
final Iterator inputA;
final Iterator inputB;

// Internal states
private int state = 0; // 我们加上的状态变量
private Integer headA; // 变换前的局部变量,因为跨了多次 next() 调用,不能再是局部变量了
private Integer headB; // 同上

public Merger(Iterator inputA, Iterator inputB) {
this.inputA = inputA;
this.inputB = inputB;
}

public Integer next() {
for (;;) { // 这个循环是有用的,往下看几行
switch (state) {
case 0:
headA = inputA.next();
headB = inputB.next();
state = 1;
break; // 这里就用上了外层的循环
case 1:
if (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB ) {
final int result = headB;
headB = inputB.next();
state = 1; // 可以省略
return result; // 变换前是 yield result
} else {
final int result = headA;
headA = inputA.next();
state = 1; // 可以省略
return result; // 变换前是 yield result
}
} else {
state = 2;
return null; // 变换前是 yield null
}
case 2:
// Generator 已经终结了(变换前:函数已经走到底了)
throw new IllegalStateException("Generator has been exhausted!");
default:
throw new AssertionError("Unreachable!");
}
}
}
}

别急,最后我们会简化这些充满废话的代码。

阶段性总结一下:到现在为止,我们做了一件伟大的事情——把一个函数变成了 Iterator,函数已经不再是函数,而是一个状态机,这个状态记录了下次调用 next() 需要从哪继续

套用一下术语:“从哪继续”就是 Continuation,把 Continuation 搞出来的这个过程称为 CPS 变换

Iterator → AsyncIterator

呃…… 说好的 CompletableFuture 呢?离 CompletableFuture 只有一步之遥了!

先从接口下手。想象两个 Stream Input 都是从 IO 拿到的数据,所以每次 next() 其实背后都是一次 IO,应该把它用 CompletableFuture 包成异步的,接口大概长这样:

1
2
3
interface AsyncIterator<T> {
CompletableFuture<T> next();
}

类似刚刚引入 Generator 一样,我们再假装有 await 关键字。await 关键字表示异步地等待结果返回,有了它,函数就魔法般的暂停在等待异步 IO 的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Stream merge(Stream inputA, Stream inputB) {
Integer headA = inputA.next();
Integer headB = inputB.next();
WHILE_LOOP:
if (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
Integer result = headB;
headB = [await] inputB.next(); // await 会魔法般地等待 next() 完成再继续运行
[yield] result;
} else {
Integer result = headA;
headA = [await] inputA.next();
[yield] result;
}
goto WHILE_LOOP;
}
[yield] null;
}

因为 await 也会暂停这个“函数”,所以和刚刚对 yield 的处理一样,我们想象 await 这里有一个断点,我们也要为它设置 State 标记:

flow-graph -1-

糟糕!这状态数有点多啊!好在 Java 8 提供了 Lambda 表达式,和 CompletableFuture 搭配食用口味更佳。图中的大多数状态都可以借助 Lambda 表达式来实现,节约了不少代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Merger implements AsyncIterator<Integer> {
// Arguments
final Stream inputA;
final Stream inputB;

// Internal states
private int state = 0;
private Integer headA;
private Integer headB;

public Merger(Stream inputA, Stream inputB) {
this.inputA = inputA;
this.inputB = inputB;
}

public CompletableFuture<Integer> next() {
switch (state) {
case 0:
return inputA.next().thenCompose(a -> { // State 1 在这里!
headA = a;
return inputB.next();
}).thenCompose(b -> { // State 2 在这里!
headB = b;
state = 3;
return next(); // 相当于原来的外层循环
});
case 3:
if (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
final Integer result = headB;
return inputB.next().thenCompose(b -> { // State 4 在这里!
headB = b;
state = 3; // 可以省略
return CompletableFuture.completedFuture(result);
});
} else {
final Integer result = headA;
return inputA.next().thenCompose(a -> { // State 5 在这里!
headA = a;
state = 3; // 可以省略
return CompletableFuture.completedFuture(result);
});
}
} else {
state = 6;
return CompletableFuture.completedFuture(null);
}
case 6:
throw new IllegalStateException("Generator has been exhausted!");
default:
throw new AssertionError("Unreachable!");
}
}
}

Refinement

上面我们只用了 thenCompose,理论上这是 OK 的,但是实际上 CompletableFuture 有上百个方法,最合适的才是坠吼的。

  • 如果仅仅是返回一个值(而非阶段),可以用 thenApply
  • thenCombine 等待两个 CompletableFuture 都完成了再去调用 BiFunction (T, U) -> R 来消费。

思考题:有兴趣的读者可以思考一下 thenCombine 的实现。

整理一下上面的代码,比如这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Merger implements AsyncIterator<Integer> {
// States
enum State { START, ITERATING, DONE }

// Arguments
final Stream inputA;
final Stream inputB;

// Internal states
private State state = State.START;
private Integer headA;
private Integer headB;

public Merger(Stream inputA, Stream inputB) {
this.inputA = inputA;
this.inputB = inputB;
}

private CompletableFuture<Integer> next() {
switch (state) {
case START:
// 这里做了小小的优化:这两个 next() 可以并行等待
return inputA.next().thenCombine(inputB.next(), (a, b) -> {
headA = a;
headB = b;
state = State.ITERATING;
return (Void)null;
}).thenCompose(__ -> next());
case ITERATING:
if (headA != null || headB != null) {
if (headA == null || headB != null && headA > headB) {
final Integer result = headB;
return inputB.next().thenApply(b -> { // thenCompose 某个值 <=> thenApply
headB = b;
return result;
});
} else {
final Integer result = headA;
return inputA.next().thenApply(a -> { // 同上
headA = a;
return result;
});
}
} else {
state = State.DONE;
return CompletableFuture.completedFuture(null);
}
case DONE:
throw new IllegalStateException("Generator has been exhausted!");
default:
throw new AssertionError("Unreachable!");
}
}
}

总结

任何函数都可以用 CompletableFuture 实现异步化,最通用的方式如下:

  1. 在函数里加上 yield(返回下一个结果)和 await(等待输入值)来标记断点;
  2. 画出控制流图,注意要在 yieldawait 处断开,断开处标记为状态;
  3. 实现一个状态机类,把控制流图中的代码块、状态都无脑填进去,搞定。

这一刻,我们都是(人肉)编译器。