● 在并发编程中,我们经常用到非阻塞的模型,在之前的多线程的三种实现中,不管是继承thread类还是实现runnable接口,都无法保证获取到之前的执行结果。通过实现Callback接口,并用Future可以来接收多线程的执行结果。
● Future表示一个可能还没有完成的异步任务的结果,针对这个结果可以添加Callback以便在任务执行成功或失败后作出相应的操作。
● Futrue可以监视目标线程调用call的情况,当你调用Future的get()方法以获得结果时,当前线程就开始阻塞,直接call方法结束返回结果。
● cancel,取消Callable的执行,当Callable还没有完成时
● get,获得Callable的返回值
● isCanceled,判断是否取消了
● isDone,判断是否完成
● 创建线程池对象
● 调用submit提交Runnable或Callable对象。
● 如果想要取消一个任务,或如果提交Callable对象,那就要保存好返回的Future对象。
● 当不再提交任何任务时,调用shutdown方法。
● CompletableFuture实现了CompletionStage接口和Future接口,前者是对后者的一个扩展,增加了异步回调、流式处理、多个Future组合处理的能力,使Java在处理多任务的协同工作时更加 顺畅便利。
● 一个completetableFuture就代表了一个任务。他能用Future的方法。还能做一些之前说的executorService配合futures做不了的。
● Future.submit:通常的线程池接口类ExecutorService,其中execute方法的返回值是void,即无法获取异步任务的执行状态,3个重载的submit方法的返回值是Future,可以据此获取任务执行的状态和结果
● supplyAsync / runAsync: supplyAsync表示创建带返回值的异步任务的,相当于ExecutorService submit(Callable<T> task) 方法,runAsync表示创建无返回值的异步任务,相当于ExecutorService submit(Runnable task)方法,这两方法的效果跟submit是一样。这两个方法的带 executor 的变种,表示让任务在指定的线程池中执行,不指定的话,通常任务是在ForkJoinPool.commonPool()线程池中执行的。
● thenApply / thenApplyAsync: thenApply 表示某个任务执行完成后执行的动作,即回调方法,会将该任务的执行结果即方法返回值作为入参传递到回调方法中。
● thenAccept / thenRun: thenAccept 同 thenApply 接收上一个任务的返回值作为参数,但是无返回值;thenRun 的方法没有入参,也没有返回值。
● thenCombine / thenAcceptBoth / runAfterBoth:
这三个方法都是将两个CompletableFuture组合起来,只有这两个都正常执行完了才会执行某个任务,区别在于,thenCombine会将两个任务的执行结果作为方法入参传递到指定方法中,且该方法有返回值;thenAcceptBoth同样将两个任务的执行结果作为方法入参,但是无返回值;runAfterBoth没有入参,也没有返回值。注意两个任务中只要有一个执行异常,则将该异常信息作为指定任务的执行结果。
● allOf / anyOf:
allOf返回的CompletableFuture是多个任务都执行完成后才会执行,只有有一个任务执行异常,则返回的CompletableFuture执行get方法时会抛出异常,如果都是正常执行,则get返回null。
//futureList用于接收子线程返回结果
List<CompletableFuture(Future也可)<String>> futureList = Lists.newArrayList();
for{
futureList.add( testBaidu() );
}
@Async("myExecutor")//方法加入@Asynv代表异步执行
public CompletableFuture<String> testBaidu(){
String body = result.getBody();
return CompletableFuture.completedFuture(body);
}
● Fork/Join它可以将一个大的任务拆分成多个子任务进行并处理,最后将子任务结果合并成最后的计算结果并进行输出
● Fork/Join框架要完成的两件事情:
Fork:把一个复杂任务进行分拆,大事化了
Join:把分拆任务的结果进行合并
● 首先Fork/Join框架需要把大的任务分割成足够小的子任务,如果子任务比较大的话还要对子任务进行继续分割
● 分割的子任务分别放到双端队列里,然后几个启动线程分别从双端队列里获取任务执行,子任务执行完的结果都放在另外一个队列里,启动一个线程从队列里取数据,然后合并数据。
1.ForkJoinTask:
我们要使用Fork/Join框架,首先创建一个ForkJoin任务。该类提供了在任务中执行fork和join的机制。通常情况下我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了两个子类
RecursiveAction:用于没有返回结果的任务
RecursiveTask:用于有返回结果的任务
2.ForkJoinPool:
ForkJoinTask需要通过ForkJoinPool来执行
3.RecursiveTask:继承后可以实现递归(自己调自己)调用的任务
三、Fork/Join框架实现原理
ForkJoinPool由ForkJoinTask数组和ForkJoinWorkerThread数组组成
ForkJoinTask数组负责将存放以及将程序提交给ForkJoinPool,而ForkJoinWorkerThread负责执行这些任务
MyForkJoin myForkJoin2 = new MyForkJoin(teachers);
myForkJoin2.fork();
List<Person> join2 = myForkJoin2.join();
1.fork():利用另一个 ForkJoinPool 线程异步执行新创建的子任务
2.join():读取第一个子任务的结果,尚未完成就等待
对一个任务调用 join 方法会阻塞调用方,直到任务结束才做出结果,因此,有必要在两子任务都开始计算后再调用。
四、产生的问题
1.多个子任务因为各种问题,完成时间不一致,从而因为任务分配不均匀而造成资源浪费?
工作窃取:
在 ForkJoinPool 线程池中,每个线程都为分配给它的任务保存一个双向链式队列(Deque)。当前线程,每完成一个任务,就会从队列
头上取出下一个任务开始执行。 然而,因为上述的某些原因,有些工作线程会早早完成任务而空闲下来,有些线程仍在继续工作。此时,
那些闲下来的线程,会随机的从仍在工作的线程的尾部“偷走“”一个任务继续工作下去,直至所有任务全部完成。
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- esig.cn 版权所有 湘ICP备2023023988号-3
违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务