博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ExecutorCompletionService源码分析
阅读量:7011 次
发布时间:2019-06-28

本文共 5255 字,大约阅读时间需要 17 分钟。

hot3.png

ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:

private final Executor executor;    private final AbstractExecutorService aes;    private final BlockingQueue
> completionQueue;

构造方法

public ExecutorCompletionService(Executor executor) {        if (executor == null)            throw new NullPointerException();        this.executor = executor;        this.aes = (executor instanceof AbstractExecutorService) ?            (AbstractExecutorService) executor : null;        this.completionQueue = new LinkedBlockingQueue
>(); }

根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。

public ExecutorCompletionService(Executor executor,BlockingQueue
> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }

根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。

下面看看ExecutorCompletionService如何包装自己的Future。

private class QueueingFuture extends FutureTask
{ QueueingFuture(RunnableFuture
task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future
task; }

内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。

同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。

//包装Callable的返回结果    private RunnableFuture
newTaskFor(Callable
task) { if (aes == null) return new FutureTask
(task); else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask
(task) //为什么还要组合一个AbstractExecutorService呢,感觉多此一举 return aes.newTaskFor(task); } //包装Runnable private RunnableFuture
newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask
(task, result);//内部也是使用适配器模式将Runnable包装为Callable else return aes.newTaskFor(task, result); }

submit

下面看看提交任务的实现:

public Future
submit(Callable
task) { if (task == null) throw new NullPointerException(); RunnableFuture
f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future
submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture
f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }

代码非常简单,具体执行都由Executor代劳。

获取结果

public Future
take() throws InterruptedException { return completionQueue.take(); } public Future
poll() { return completionQueue.poll(); } public Future
poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }

提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。

以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:

现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:

public class CompletionServiceTest {    private   static ExecutorService            executor;    private   static int                        numTask=50;    private   static CompletionService
completionService; private static class MyTask implements Runnable{ private String taskName; private MyTask(String taskName){ this.taskName=taskName; } @Override public void run() { long s=System.currentTimeMillis(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms"); } } public static void main(String[] args) { startAll(); try { get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } executor.shutdown();//切记需要关闭线程池,否则,进程不会结束 } private static void startAll(){ if(null==executor){ executor=Executors.newFixedThreadPool(numTask); } if(null==completionService){ completionService=new ExecutorCompletionService
(executor);//使用默认队列 } for(int i=0;i

欢迎指出本文有误的地方,转载请注明原文出处

转载于:https://my.oschina.net/7001/blog/874638

你可能感兴趣的文章
英特尔或推可超频Kaby Lake酷睿i3处理器: 重拾赛扬300A荣光?
查看>>
要想在未来立足 微软等软件公司就必须折本研发硬件
查看>>
QTP使用中的陷阱
查看>>
Cirrus Delaware公司数据中心计划因建设电厂再次受阻
查看>>
前Windows事业部总裁写给CEO和管理者:如何做决策?
查看>>
美国国防部最新报告:美军武器系统可能已经被植入后门
查看>>
Google产品管理副总裁:好产品要不断走出舒适区
查看>>
2016年中国大数据应用将发生质变
查看>>
回忆录:30岁那年,你成长了吗?(上篇)
查看>>
智能家居市场发展困境
查看>>
中芯国际第三财季净利润1.136亿美元
查看>>
关于SaaS和数据恢复的6大谬误
查看>>
调查:95% 的 APT 攻击源起社交网站
查看>>
《Kali Linux渗透测试的艺术》—第2章2.3节安全测试方法论
查看>>
《版式设计——日本平面设计师参考手册》—第1章段落样式和字符样式的应用...
查看>>
《软件工艺师:专业、务实、自豪》一3.7.1 软件工艺峰会
查看>>
《善用佳软:高效能人士的软件应用之道》一2.4 项目管理:免费Project查看软件汇总...
查看>>
Galera 将死 — MySQL Group Replication 发布
查看>>
Mozilla 发现用于中间人攻击的证书
查看>>
Docker 中管理数据 【已翻译100%】
查看>>