曹耘豪的博客

Java并发之CompletionService

  1. 背景
    1. 传统写法
    2. 如果使用CompletionService
  2. CompletionService原理
  3. 其他使用场景
    1. 取出第一个完成的结果
    2. 限时内取出第一个非null的结果

背景

当并行从有多个数据源同时取数据时,我们可能会写出以下代码,先提交任务再循环get,使用异常进行逻辑判断,很不优雅

传统写法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final long TIMEOUT = 500;
final int TASK_NUM = 5;

List<Future<String>> futures = new ArrayList<>();
for (int i = 0; i < TASK_NUM; i++) {
Future<String> future = executor.submit(callable);
futures.add(future);
}

long start = System.currentTimeMillis();
for (Future<String> future : futures) {
long time = TIMEOUT - (System.currentTimeMillis() - start);
if (time > 0 || future.isDone()) {
try {
String s = future.get(time, TimeUnit.MILLISECONDS);
res.add(s);
} catch (Exception e) {
e.printStackTrace();
}
} else {
future.cancel(true); // 中断超时线程
}
}

如果使用CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
CompletionService<String> completionService = new ExecutorCompletionService<>(executor);
for (int i = 0; i < TASK_NUM; i++) {
completionService.submit(callable);
}

long start2 = System.currentTimeMillis();
for (int i = 0; i < 3; i++) {
long time = TIMEOUT - (System.currentTimeMillis() - start2);
if (time <= 0) break;
Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS);
if (future == null) break; // future为null说明超时
res.add(f.get());
}

CompletionService原理

1
2
3
4
5
6
7
8
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); } // 完成时加入到完成队列
private final Future<V> task;
}

其他使用场景

取出第一个完成的结果

1
2
3
4
5
for (int i = 0; i < TASK_NUM; i++) {
completionService.submit(callable);
}
String result = completionService.take().get();
// use result

限时内取出第一个非null的结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
List<Future<String>> futures4 = new ArrayList<>(TASK_NUM); // 用于中断
for (int i = 0; i < TASK_NUM; i++) {
futures4.add(completionService.submit(callable));
}
long start4 = System.currentTimeMillis(); // 计时

String result4 = null;
for (int i = 0; i < TASK_NUM; i++) {
long time = TIMEOUT - (System.currentTimeMillis() - start4);
if (time <= 0) break;
Future<String> future = completionService.poll(time, TimeUnit.MILLISECONDS);
if (future == null) break; // future为null说明超时,直接退出
String data = future.get();
if (data != null) {
result4 = data;
break;
}
}
for (Future<String> future : futures4) {
future.cancel(true);
}

if (result4 != null) {
// use result
}
   /