java completablefuture 详解-mile米乐体育
future是java 5添加的类,用来描述一个异步计算的结果。你可以使用isdone
方法检查计算是否完成,或者使用get
阻塞住调用线程,直到计算完成返回结果,你也可以使用cancel
方法停止任务的执行。
public class basicfuture { public static void main(string[] args) throws executionexception, interruptedexception { executorservice es = executors.newfixedthreadpool(10); futuref = es.submit(() ->{ // 长时间的异步计算 // …… // 然后返回结果 return 100; }); // while(!f.isdone()) // ; f.get(); } }
虽然future
以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的cpu资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式当计算结果完成及时通知监听者呢?
很多语言,比如node.js,采用回调的方式实现异步编程。java的一些框架,比如netty,自己扩展了java的 future
接口,提供了addlistener
等多个扩展方法:
channelfuture future = bootstrap.connect(new inetsocketaddress(host, port)); future.addlistener(new channelfuturelistener() { @override public void operationcomplete(channelfuture future) throws exception { if (future.issuccess()) { // success } else { // failure } } });
google guava也提供了通用的扩展future:listenablefuture、settablefuture 以及辅助类futures等,方便异步编程。
final string name = ...; inflight.add(name); listenablefuturefuture = service.query(name); future.addlistener(new runnable() { public void run() { processedcount.incrementandget(); inflight.remove(name); lastprocessed.set(name); logger.info("done with {0}", name); } }, executor);
scala也提供了简单易用且功能强大的future/promise异步编程模式。
作为正统的java类库,是不是应该做点什么,加强一下自身库的功能呢?
在java 8中, 新增加了一个包含50个方法左右的类: completablefuture,提供了非常强大的future的扩展功能,可以帮助我们简化异步编程的复杂性,提供了函数式编程的能力,可以通过回调的方式处理计算结果,并且提供了转换和组合completablefuture的方法。
下面我们就看一看它的功能吧。
主动完成计算
completablefuture类实现了completionstage和future接口,所以你还是可以像以前一样通过阻塞或者轮询的方式获得结果,尽管这种方式不推荐使用。
public t get() public t get(long timeout, timeunit unit) public t getnow(t valueifabsent) public t join()
getnow
有点特殊,如果结果已经计算完则返回结果或者抛出异常,否则返回给定的valueifabsent
值。
join
返回计算的结果或者抛出一个unchecked异常(completionexception),它和get
对抛出的异常的处理有些细微的区别,你可以运行下面的代码进行比较:
completablefuturefuture = completablefuture.supplyasync(() -> { int i = 1/0; return 100; }); //future.join(); future.get();
尽管future可以代表在另外的线程中执行的一段异步代码,但是你还是可以在本身线程中执行:
public static completablefuturecompute() { final completablefuture future = new completablefuture<>(); return future; }
上面的代码中future
没有关联任何的callback
、线程池、异步任务等,如果客户端调用future.get
就会一致傻等下去。你可以通过下面的代码完成一个计算,触发客户端的等待:
f.complete(100);
当然你也可以抛出一个异常,而不是一个成功的计算结果:
f.completeexceptionally(new exception());
完整的代码如下:
public class basicmain { public static completablefuturecompute() { final completablefuture future = new completablefuture<>(); return future; } public static void main(string[] args) throws exception { final completablefuture f = compute(); class client extends thread { completablefuture f; client(string threadname, completablefuture f) { super(threadname); this.f = f; } @override public void run() { try { system.out.println(this.getname() ": " f.get()); } catch (interruptedexception e) { e.printstacktrace(); } catch (executionexception e) { e.printstacktrace(); } } } new client("client1", f).start(); new client("client2", f).start(); system.out.println("waiting"); f.complete(100); //f.completeexceptionally(new exception()); system.in.read(); } }
可以看到我们并没有把f.complete(100);
放在另外的线程中去执行,但是在大部分情况下我们可能会用一个线程池去执行这些异步任务。completablefuture.complete()
、completablefuture.completeexceptionally
只能被调用一次。但是我们有两个后门方法可以重设这个值:obtrudevalue
、obtrudeexception
,但是使用的时候要小心,因为complete
已经触发了客户端,有可能导致客户端会得到不期望的结果。
创建completablefuture对象。
completablefuture.completedfuture
是一个静态辅助方法,用来返回一个已经计算好的completablefuture
。
public static completablefuture completedfuture(u value)
而以下四个静态方法用来为一段异步执行的代码创建completablefuture
对象:
public static completablefuturerunasync(runnable runnable) public static completablefuture runasync(runnable runnable, executor executor) public static completablefuture supplyasync(supplier supplier) public static completablefuture supplyasync(supplier supplier, executor executor)
以async
结尾并且没有指定executor
的方法会使用forkjoinpool.commonpool()
作为它的线程池执行异步代码。
runasync
方法也好理解,它以runnable
函数式接口类型为参数,所以completablefuture
的计算结果为空。
supplyasync
方法以supplier
函数式接口类型为参数,completablefuture
的计算结果类型为u
。
因为方法的参数类型都是函数式接口,所以可以使用lambda表达式实现异步任务,比如:
completablefuturefuture = completablefuture.supplyasync(() -> { //长时间的计算任务 return "·00"; });
计算结果完成时的处理
当completablefuture
的计算结果完成,或者抛出异常的时候,我们可以执行特定的action
。主要是下面的方法:
public completablefuturewhencomplete(biconsumer action) public completablefuture whencompleteasync(biconsumer action) public completablefuture whencompleteasync(biconsumer action, executor executor) public completablefuture exceptionally(function fn)
可以看到action
的类型是biconsumer
,它可以处理正常的计算结果,或者异常情况。
方法不以async
结尾,意味着action
使用相同的线程执行,而async
可能会使用其它的线程去执行(如果使用相同的线程池,也可能会被同一个线程选中执行)。
注意这几个方法都会返回completablefuture
,当action
执行完毕后它的结果返回原始的completablefuture
的计算结果或者返回异常。
public class main { private static random rand = new random(); private static long t = system.currenttimemillis(); static int getmoredata() { system.out.println("begin to start compute"); try { thread.sleep(10000); } catch (interruptedexception e) { throw new runtimeexception(e); } system.out.println("end to start compute. passed " (system.currenttimemillis() - t)/1000 " seconds"); return rand.nextint(1000); } public static void main(string[] args) throws exception { completablefuturefuture = completablefuture.supplyasync(main::getmoredata); future f = future.whencomplete((v, e) -> { system.out.println(v); system.out.println(e); }); system.out.println(f.get()); system.in.read(); } }
exceptionally
方法返回一个新的completablefuture,当原始的completablefuture抛出异常的时候,就会触发这个completablefuture的计算,调用function计算值,否则如果原始的completablefuture正常计算完后,这个新的completablefuture也计算完成,它的值和原始的completablefuture的计算的值相同。也就是这个exceptionally
方法用来处理异常的情况。
下面一组方法虽然也返回completablefuture对象,但是对象的值和原来的completablefuture计算的值不同。当原先的completablefuture的值计算完成或者抛出异常的时候,会触发这个completablefuture对象的计算,结果由bifunction
参数计算而得。因此这组方法兼有whencomplete
和转换的两个功能。
public completablefuture handle(bifunction fn) public completablefuture handleasync(bifunction fn) public completablefuture handleasync(bifunction fn, executor executor)
同样,不以async
结尾的方法由原来的线程计算,以async
结尾的方法由默认的线程池forkjoinpool.commonpool()
或者指定的线程池executor
运行。
转换
completablefuture
可以作为monad(单子)和functor。由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉completablefuture
当计算完成的时候请执行某个function
。而且我们还可以将这些操作串联起来,或者将completablefuture
组合起来。
public completablefuture thenapply(function fn) public completablefuture thenapplyasync(function fn) public completablefuture thenapplyasync(function fn, executor executor)
这一组函数的功能是当原来的completablefuture计算完后,将结果传递给函数fn
,将fn
的结果作为新的completablefuture
计算结果。因此它的功能相当于将completablefuture
转换成completablefuture
。
这三个函数的区别和上面介绍的一样,不以async
结尾的方法由原来的线程计算,以async
结尾的方法由默认的线程池forkjoinpool.commonpool()
或者指定的线程池executor
运行。java的completablefuture类总是遵循这样的原则,下面就不一一赘述了。
使用例子如下:
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture f = future.thenapplyasync(i -> i * 10).thenapply(i -> i.tostring()); system.out.println(f.get()); //"1000"
需要注意的是,这些转换并不是马上执行的,也不会阻塞,而是在前一个stage完成后继续执行。
它们与handle
方法的区别在于handle
方法会处理正常计算值和异常,因此它可以屏蔽异常,避免异常继续抛出。而thenapply
方法只是用来处理正常值,因此一旦有异常就会抛出。
纯消费(执行action)
上面的方法是当计算完成的时候,会生成新的计算结果(thenapply
, handle
),或者返回同样的计算结果whencomplete
,completablefuture
还提供了一种处理结果的方法,只对结果执行action
,而不返回新的计算值,因此计算值为void
:
public completablefuturethenaccept(consumer action) public completablefuture thenacceptasync(consumer action) public completablefuture thenacceptasync(consumer action, executor executor)
看它的参数类型也就明白了,它们是函数式接口consumer
,这个接口只有输入,没有返回值。
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture f = future.thenaccept(system.out::println); system.out.println(f.get());
thenacceptboth
以及相关方法提供了类似的功能,当两个completionstage都正常完成计算的时候,就会执行提供的action
,它用来组合另外一个异步的结果。
runafterboth
是当两个completionstage都正常完成计算的时候,执行一个runnable,这个runnable并不使用计算的结果。
public completablefuturethenacceptboth(completionstage other, biconsumer action) public completablefuture thenacceptbothasync(completionstage other, biconsumer action) public completablefuture thenacceptbothasync(completionstage other, biconsumer action, executor executor) public completablefuture runafterboth(completionstage other, runnable action)
例子如下:
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture f = future.thenacceptboth(completablefuture.completedfuture(10), (x, y) -> system.out.println(x * y)); system.out.println(f.get());
更彻底地,下面一组方法当计算完成的时候会执行一个runnable,与thenaccept
不同,runnable并不使用completablefuture计算的结果。
public completablefuturethenrun(runnable action) public completablefuture thenrunasync(runnable action) public completablefuture thenrunasync(runnable action, executor executor
因此先前的completablefuture计算的结果被忽略了,这个方法返回completablefuture
类型的对象。
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture f = future.thenrun(() -> system.out.println("finished")); system.out.println(f.get());
因此,你可以根据方法的参数的类型来加速你的记忆。
runnable
类型的参数会忽略计算的结果,consumer
是纯消费计算结果,biconsumer
会组合另外一个completionstage
纯消费,function
会对计算结果做转换,bifunction
会组合另外一个completionstage
的计算结果做转换。
组合
public completablefuture thencompose(function> fn) public completablefuture thencomposeasync(function> fn) public completablefuture thencomposeasync(function> fn, executor executor
这一组方法接受一个function作为参数,这个function的输入是当前的completablefuture的计算值,返回结果将是一个新的completablefuture,这个新的completablefuture会组合原来的completablefuture和函数返回的completablefuture。因此它的功能类似:
a --> b ---> c
记住,thencompose
返回的对象并不一是函数fn
返回的对象,如果原来的completablefuture
还没有计算出来,它就会生成一个新的组合后的completablefuture。
例子:
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture f = future.thencompose( i -> { return completablefuture.supplyasync(() -> { return (i * 10) ""; }); }); system.out.println(f.get()); //1000
而下面的一组方法thencombine
用来复合另外一个completionstage的结果。它的功能类似:
a | ------> c ------^ b
两个completionstage是并行执行的,它们之间并没有先后依赖顺序,other
并不会等待先前的completablefuture
执行完毕后再执行。
public completablefuturethencombine(completionstage other, bifunction fn) public completablefuture thencombineasync(completionstage other, bifunction fn) public completablefuture thencombineasync(completionstage other, bifunction fn, executor executor)
其实从功能上来讲,它们的功能更类似thenacceptboth
,只不过thenacceptboth
是纯消费,它的函数参数没有返回值,而thencombine
的函数参数fn
有返回值。
completablefuturefuture = completablefuture.supplyasync(() -> { return 100; }); completablefuture future2 = completablefuture.supplyasync(() -> { return "abc"; }); completablefuture f = future.thencombine(future2, (x,y) -> y "-" x); system.out.println(f.get()); //abc-100
either
thenacceptboth
和runafterboth
是当两个completablefuture都计算完成,而我们下面要了解的方法是当任意一个completablefuture计算完成的时候就会执行。
public completablefutureaccepteither(completionstage other, consumer action) public completablefuture accepteitherasync(completionstage other, consumer action) public completablefuture accepteitherasync(completionstage other, consumer action, executor executor) public completablefuture applytoeither(completionstage other, function fn) public completablefuture applytoeitherasync(completionstage other, function fn) public completablefuture applytoeitherasync(completionstage other, function fn, executor executor)
accepteither
方法是当任意一个completionstage完成的时候,action
这个消费者就会被执行。这个方法返回completablefuture
applytoeither
方法是当任意一个completionstage完成的时候,fn
会被执行,它的返回值会当作新的completablefuture
的计算结果。
下面这个例子有时会输出100
,有时候会输出200
,哪个future先完成就会根据它的结果计算。
random rand = new random(); completablefuturefuture = completablefuture.supplyasync(() -> { try { thread.sleep(10000 rand.nextint(1000)); } catch (interruptedexception e) { e.printstacktrace(); } return 100; }); completablefuture future2 = completablefuture.supplyasync(() -> { try { thread.sleep(10000 rand.nextint(1000)); } catch (interruptedexception e) { e.printstacktrace(); } return 200; }); completablefuture f = future.applytoeither(future2,i -> i.tostring());
辅助方法 allof
和 anyof
前面我们已经介绍了几个静态方法:completedfuture
、runasync
、supplyasync
,下面介绍的这两个方法用来组合多个completablefuture。
public static completablefutureallof(completablefuture... cfs) public static completablefuture