ExecutorCompletionService实现了接口CompletionService(关于CompletionService的说明可参见),其内部组合了Executor,AbstractExecutorService,BlockingQueue,其中Executor用于执行任务,AbstractExecutorService负责适配返回的FutureTask对象(不太明白组合AbstractExecutorService的好处),BlockingQueue则用于存放已完成任务的Future。看源码:
private final Executor executor; private final AbstractExecutorService aes; private final BlockingQueue> completionQueue;
构造方法
public ExecutorCompletionService(Executor executor) { if (executor == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = new LinkedBlockingQueue>(); }
根据指定executor创建ExecutorCompletionService对象,内部默认使用LinkedBlockingQueue作为完成队列。
public ExecutorCompletionService(Executor executor,BlockingQueue> completionQueue) { if (executor == null || completionQueue == null) throw new NullPointerException(); this.executor = executor; this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null; this.completionQueue = completionQueue; }
根据指定executor和completionQueue创建ExecutorCompletionService对象,支持定制完成队列。
下面看看ExecutorCompletionService如何包装自己的Future。
private class QueueingFuture extends FutureTask{ QueueingFuture(RunnableFuture task) { super(task, null); this.task = task; } protected void done() { completionQueue.add(task); } private final Future task; }
内部类QueueingFuture用于包装Future,当任务完成后自动调用done()将任务返回的Future添加到队列中,done()在FutureTask的finishCompletion()中调用,可自行查看。
同时ExecutorCompletionService定义自己的实现包装任务Runnable和Callable的结果。
//包装Callable的返回结果 private RunnableFuturenewTaskFor(Callable task) { if (aes == null) return new FutureTask (task); else //查看AbstractExecutorService中的newTaskFor实现可知,实际就是new FutureTask (task) //为什么还要组合一个AbstractExecutorService呢,感觉多此一举 return aes.newTaskFor(task); } //包装Runnable private RunnableFuture newTaskFor(Runnable task, V result) { if (aes == null) return new FutureTask (task, result);//内部也是使用适配器模式将Runnable包装为Callable else return aes.newTaskFor(task, result); }
submit
下面看看提交任务的实现:
public Futuresubmit(Callable task) { if (task == null) throw new NullPointerException(); RunnableFuture f = newTaskFor(task); executor.execute(new QueueingFuture(f)); return f; } public Future submit(Runnable task, V result) { if (task == null) throw new NullPointerException(); RunnableFuture f = newTaskFor(task, result); executor.execute(new QueueingFuture(f)); return f; }
代码非常简单,具体执行都由Executor代劳。
获取结果
public Futuretake() throws InterruptedException { return completionQueue.take(); } public Future poll() { return completionQueue.poll(); } public Future poll(long timeout, TimeUnit unit) throws InterruptedException { return completionQueue.poll(timeout, unit); }
提供以上三种获取任务结果的Future,take和poll都会在取得结果后从队列移除元素,不同的是,当队列为空的表现不同,take会阻塞,poll将返回null,poll(long timeout, TimeUnit unit)则会等待指定时间。
以上则是ExecutorCompletionService的全部源码,非常简单。下面看一下ExecutorCompletionService的应用场景:
现在有一组需要并发执行的任务,各任务执行周期不相同,客户端希望完成一个任务则立即取得执行结果。该场景下,使用ExecutorCompletionService即可优雅的实现。这里选用Runnable作为任务,对ExecutorCompletionService来说,Runnable和Callable都一样,Runnable最终也会封装为Callable,代码如下:
public class CompletionServiceTest { private static ExecutorService executor; private static int numTask=50; private static CompletionServicecompletionService; private static class MyTask implements Runnable{ private String taskName; private MyTask(String taskName){ this.taskName=taskName; } @Override public void run() { long s=System.currentTimeMillis(); try { Thread.sleep(new Random().nextInt(1000)); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.taskName+" finished! The total time is "+(System.currentTimeMillis()-s)+"ms"); } } public static void main(String[] args) { startAll(); try { get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } executor.shutdown();//切记需要关闭线程池,否则,进程不会结束 } private static void startAll(){ if(null==executor){ executor=Executors.newFixedThreadPool(numTask); } if(null==completionService){ completionService=new ExecutorCompletionService (executor);//使用默认队列 } for(int i=0;i
欢迎指出本文有误的地方,转载请注明原文出处