JUC之AQS-CyclicBarrier小结
什么是CyclicBarrier
CyclicBarrier是一个同步工具类,它允许一组线程互相等待,直到到达某个公共屏障点。与CountDownLatch不同的是该barrier在释放等待线程后可以重用,所以称它为循环(Cyclic)的屏障(Barrier)。
CyclicBarrier大致是可循环利用的屏障,顾名思义,这个名字也将这个类的特点给明确地表示出来了。首先,便是可重复利用,说明该类创建的对象可以复用;其次,屏障则体现了该类的原理:每个线程执行时,都会碰到一个屏障,直到所有线程执行结束,然后屏障便会打开,使所有线程继续往下执行。
CyclicBarrier支持一个可选的Runnable命令,在一组线程中的最后一个线程到达之后(但在释放所有线程之前),该命令只在每个屏障点运行一次。若在继续所有参与线程之前更新共享状态,此屏障操作很有用。
看个例子
public class CyclicBarrierDemo {
private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5);
private static final Logger LOGGER = Logger.getLogger("CyclicBarrierDemo");
public static void main(String[] args) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
final int threadNum = i;
Thread.sleep(1000);
exec.execute(() -> {
try {
race(threadNum);
} catch (Exception e) {
LOGGER.warning("execption:" + e);
}
});
}
}
private static void race(int threadNum) throws Exception {
Thread.sleep(1000); // 模拟任务执行1s
LOGGER.info("thread " + threadNum + " [准备]好了");
cyclicBarrier.await();
LOGGER.info("thread " + threadNum + " 继续[执行]了!!!");
}
}
这里我们模拟10个线程执行任务,每个时刻有五个线程会互相等待,均准备好后才开始执行。
线程一个接一个启动了,一旦第5个线程到达了前4个线程到达的地方,那么每个线程都启动了,启动后的输出就没有什么顺序了。这是CycicBarrier一个比较简单的例子。
执行结果为:
七月 15, 2018 1:51:07 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 0 [准备]好了
七月 15, 2018 1:51:08 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 1 [准备]好了
七月 15, 2018 1:51:09 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 2 [准备]好了
七月 15, 2018 1:51:10 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 3 [准备]好了
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 4 [准备]好了
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 4 继续[执行]了!!!
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 3 继续[执行]了!!!
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 2 继续[执行]了!!!
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 1 继续[执行]了!!!
七月 15, 2018 1:51:11 上午 com.snowalker.test.aqs.CyclicBarrierDemo race
信息: thread 0 继续[执行]了!!!
可以看到,每一组线程最后一个到达屏障点之前,线程都处于等待状态,然后才会开始执行。
实现原理
在CyclicBarrier的内部定义了一个Lock对象,每当一个线程调用await方法时,将拦截的线程数减1,然后判断剩余拦截数是否为初始值parties,如果不是,进入Lock对象的条件队列等待。如果是,执行barrierAction对象的Runnable方法,然后将锁的条件队列中的所有线程放入锁等待队列中,这些线程会依次的获取锁、释放锁。
API解析–构造方法
CyclicBarrier(int parties)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在每个 barrier 上执行预定义的操作。
CyclicBarrier(int parties, Runnable barrierAction)
创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行
方法摘要
int await()
在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
int await(long timeout, TimeUnit unit)
在所有参与者都已经在此屏障上调用 await 方法之前,将一直等待。
int getNumberWaiting()
返回当前在屏障处等待的参与者数目。
int getParties()
返回要求启动此 barrier 的参与者数目。
boolean isBroken()
查询此屏障是否处于损坏状态。
void reset()
将屏障重置为其初始状态。
CyclicBarrier的dowait方法
dowait方法这个描述很形象,一下子就告诉了我们,我就是让线程抱团停下的方法,有什么事找我吧。
// 这是CyclicBarrier提供给我们调用的API
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen
}
}
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
//毕竟是临界区,安全起见,还是要上锁了,
//每个new出来的CyclicBarrier对象就有一把唯一(final)的重入锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
//CyclicBarrier是可以重用的,它的重用机制就是通过设置一个Generation实现
//当我们希望重用这个CyclicBarrier对象,reset()调用以后
//generation属性就被重新创建。
final Generation g = generation;
//这段代码是想说,所有的线程还没到达同一个地方,
//CyclicBarrier就被“无效”了(broken),那是不正常的,要抛异常
if (g.broken)
throw new BrokenBarrierException();
//线程被设置了中断标签,这是符合突破这个CyclicBarrier的条件了,
//所以,调用breakBarrier()没毛病,但是还是要抛出异常
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
//每次拿到锁的线程走到这里都要对计数count--
//虽然count--有三个步骤要走,但是毕竟上锁了,who cares
int index = --count;
if (index == 0) { // tripped
//走到这里,CyclicBarrier就真的被翻越了
boolean ranAction = false;
try {
//按照文档的说法,我们可以设置一个Runnable
//在CyclicBarrier被翻越的时候执行
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
//设置新的Generation已提供再用。
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
//没有超时,调用ConditionObject的await或者awaitNanos方法,当前线程被阻塞
//至于拿到的锁,会在await或者awaitNanos方法里面释放
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
//以下代码和上面说的一样,总结一下就是
//如果Thread的中断状态位被设置了就抛出InterruptedException
//或者在还有线程等待的时候CyclicBarrier被翻越BrokenBarrierException
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
//等待超时,也要抛异常,这次是TimeoutException
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
//解锁好习惯
lock.unlock();
}
}
CountDownLatch与CyclicBarrier的区别
CountDownLatch通过将await()方法和countDown()方法在不同线程组分别调用,从而实现线程组间的线程等待,即一个线程组等待另一个线程组执行结束再执行。而CyclicBarrier类则是通过调用await()方法实现线程组内的线程等待,即达到需要拦截的线程数,被拦截的线程才会依次获取锁,释放锁。
CountDownLatch是线程组之间的等待,即一个(或多个)线程等待N个线程完成某件事情之后再执行;而CyclicBarrier则是线程组内的等待,即每个线程相互等待,即N个线程都被拦截之后,然后依次执行。
CountDownLatch是减计数方式,而CyclicBarrier是加计数方式。
CountDownLatch计数为0无法重置,而CyclicBarrier计数达到初始值,则可以重置。
CountDownLatch不可以复用,而CyclicBarrier可以复用。
小结
总结起来就是,要想让规定数量的线程都达到同一个点才开始执行,就得让线程等待,计数。
AQS的条件队列和同步队列的设计正是用的这种思想,线程等待,就加入条件队列,要释放线程,因为还要获取锁才能越过CyclicBarrier的await方法,所以要加入同步队列获取锁。