之前的文章中讨论过了[[CountDownLatch详解| CountDownLatch]]的使用方法和原理。今天再来讨论一下CyclicBarrier,CyclicBarrier的使用场景其实和CountDownLatch非常相似,用法也非常像,唯一地区别就是,不像CountDownLatch那样用一次就没了,CyclicBarrier可以重复使用。就像它名字的含义:“可以重复使用的栅栏”。
今天将依然分别从它的使用和原理进行讨论。

1. CyclicBarrier的使用

首先看看JavaDoc中给出的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
class Solver {  
final int N;
final float[][] data;
final CyclicBarrier barrier;

class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);

try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}

public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
Runnable barrierAction =
new Runnable() { public void run() { mergeRows(...); }};
barrier = new CyclicBarrier(N, barrierAction);

List<Thread> threads = new ArrayList<Thread>(N);
for (int i = 0; i < N; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}

// wait until done
for (Thread thread : threads)
thread.join();
}
}
}

以上代码的目的是并行处理二维数组,使用CyclicBarrier进行同步,当所有行处理完,再进行合并操作。从以上示例代码可以看出CyclicBarrier的用法为使用await()。当await()的线程数量满足条件后,则会放行所有await()的线程。在放行之后又await()的将继续阻塞等待下一轮满足条件再放行。如下图:

2. 源码分析

2.1 首先看其成员变量和构造方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CyclicBarrier {
private static class Generation {
boolean broken = false;
}

/** The lock for guarding barrier entry */
private final ReentrantLock lock = new ReentrantLock();
/** Condition to wait on until tripped */
private final Condition trip = lock.newCondition();

// 参与的线程数
private final int parties;

// 代表越过栅栏之前,要执行相应的操作
private final Runnable barrierCommand;

// 当前所处的“代”
private Generation generation = new Generation();

// 代表还有几个线程没有await()
private int count;

public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

// 还没有到栅栏的线程数,这个值初始为 parties,然后递减
public CyclicBarrier(int parties) {
this(parties, null);
}
}

通过对其成员变量的认识,不难看出CyclicBarrier是基于ReentrantLock和Condition来实现的阻塞和释放,使用parties和count来记录参与的线程数和完成的线程数。引入了“代”的概念来实现栅栏的循环利用。

2.1 await()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
public int await() throws InterruptedException, BrokenBarrierException {  
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}

public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}

private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,TimeoutException {

final ReentrantLock lock = this.lock;
// 先获取锁
lock.lock();
try {
// 拿到当前的代
final Generation g = generation;
// 如果当前代已经被打破了,则抛出异常
if (g.broken)
throw new BrokenBarrierException();

// 如果发生了中断,打破栅栏,抛出中断异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}

// index就是即将返回的返回值。表示还剩index个线程未完成
int index = --count;

// 如果index为0,则说明这是最后一个线程,即将通过栅栏。
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
// 做为一个标记,如果执行指定的通过障碍时的command任务是否顺利执行
ranAction = true;
// 如果顺利越过障碍,则开启下一代
nextGeneration();
return 0;
} finally {
// 如果ranAction为false,则说明command在执行的过程中发生了异常。
if (!ranAction)
// 打破栅栏
breakBarrier();
}
}

// 能走到这里,说明当前线程不是最后一个。否则在上面就已经返回了。
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
// 使用Condition的await()进行阻塞。
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
// 当await()抛出中断异常,有两种可能:
// 1. 当前代wait()的时候被中断的,且栅栏没有被打破
if (g == generation && ! g.broken) {
// 打破栅栏,抛出异常
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
// 走到这里,就说明进入下一代后中断的
Thread.currentThread().interrupt();
}
}

// 如果栅栏被打破,抛出异常
if (g.broken)
throw new BrokenBarrierException();

// 如果不是当前代了, 直接返回index放行即可。
if (g != generation)
return index;

// 如果超时,打破栅栏,抛出异常
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}

以上是await()的整体流程,接下来看其中的一些操作的细节:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 生成新的代
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}


// 打破栅栏
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}

// 重制状态
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // break the current generation
nextGeneration(); // start a new generation
} finally {
lock.unlock();
}
}

// 获取正在等待的线程数
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}

// 参与的线程数
public int getParties() {
return parties;
}

// 栅栏是否被打破
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}

思考:CyclicBarrier中,那些情况下会打破栅栏?

对于这个问题,直接查看那些地方调用了breakBarrier()方法即可。根据源码可知,打破栅栏的情况有以下几种:

  1. 线程被中断;
  2. 超时;
  3. 调用reset()方法。