《JUC》CyclicBarrier原理/源码解析

网友投稿 386 2022-11-10

《JUC》CyclicBarrier原理/源码解析

《JUC》CyclicBarrier原理/源码解析

一、概述

1、作用?

允许一组线程互相等待,直到到达某个公共屏障(barrier)点。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。

2、使用场景?

用于多线程计算数据,最后合并计算结果的场景。

3、常用类方法?

await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。

4、案例

public class CyclicBarrierTest { public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(2); new Thread(() -> { System.out.println("work thread start + 1"); try { TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); // 睡一会,让主线程先干活,work thread再继续 TimeUnit.MILLISECONDS.sleep(100); System.out.println("main thread OK, work thread go on!"); } catch (Exception e) { e.printStackTrace(); } }, "work-thread").start(); // 主线程在这等着,等其他线程调用await()方法之后,大家再一起执行 cyclicBarrier.await(); System.out.println("main end"); }}

二、原理

1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。parties变量用来表示参与party的线程数;count变量代表了还没到party的线程数;外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用​​await()​​​方法将自己阻塞排队,并将计数器count减1; 3)当计数器count减为0的时候,所有因调用​​​await()​​​方法而被​​阻塞的线程​​​将被​​唤醒​​​。 4)线程的​​​排队​​​进入party​​通过ReentrantLock实现​​​;进入party后​​睡眠​​​等待所有参会者​​通过锁的条件等待Condition实现​​。

三、源码解析

1、成员变量和构造器

CyclicBarrier内部是通过条件队列​​trip​​对线程进行阻塞;两个int型的变量​​parties​​​和​​count​​:​​parties​​表示每次拦截的线程数,即party的​​所有参会者​​;​​count​​表示还未拦截的线程数,即​​还有多少参会者没到party​​;它的初始值和parties相同,调用​​await()​​方法减1,减为0时将所有阻塞在条件变量上线程唤醒。静态内部类​​Generation​​​,代表​​栅栏的当前代​​​,就像玩游戏时代表的本局游戏,利用它可以​​实现循环等待​​。​​barrierCommand​​​,表示换代前执行的任务。当前代结束会执行该任务,然后​​自动开启下一代​​。

public class CyclicBarrier { // 该类用于CyclicBarrier的一次协同是否完成(正常完成、异常完成) // reset后会复用该结构,每一次的party都会生成一个该类的新实例 private static class Generation { // 当前party有没有被强制中断,false表示没有 boolean broken = false; } // 同步锁,线程进入 条件等待 时需要获取锁 private final ReentrantLock lock = new ReentrantLock(); // 用于阻塞线程的条件变量(有未到party的线程,已到party的线程则等待在该条件变量上) private final Condition trip = lock.newCondition(); // 参与party的人数 private final int parties; // 当所有的线程都参与到了party中, 回调的方法 private final Runnable barrierCommand; // 当前party private Generation generation = new Generation(); // 还未到party的线程数 private int count; // 构造器 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }}

2、await()方法

CyclicBarrier的核心方法是​​await()​​​,该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法​​dowait()​​,只是带等待超时的多传了一个时间。

public int await() throws InterruptedException, BrokenBarrierException { try { // 直接调用自己的dowait()方法 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen }}

CyclicBarrier#dowait()方法:

首先因为await()是同步的,需要先加互斥锁ReentrantLock;每次进来都将count减1,减完立马进行判断看看count是否等于0:如果等于0,则执行​​换代前要执行的任务barrierCommand​​,然后​​唤醒所有阻塞等待的线程​​,接着​​自动进入​​CyclicBarrier的​​下一代​​;将计数器​​count​​的值重新设为parties。如果​​barrierCommand运行异常​​,则​​打破栅栏的当前代​​,唤醒所有阻塞等待的线程。count不等于0,这进入for循环:​​不是超时等待​​,直接调用​​Condition.await()阻塞​​当前线程。​​是超时等待​​,就在nanos时间内循环竞争锁;如果当前线程在​​await()​​获取锁过程中被中断了:在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。当前代已经结束,则直接中断当前线程。​​线程被唤醒后​​进行下面三个判断:如果线程因为​​broken generation​​操作(即调用breakBarrier()方法)而​​被唤醒则抛出异常​​;如果线程因为CyclicBarrier​​正常换代被唤醒​​,则​​返回计数器count的值​​;如果线程因为​​超时而被唤醒打破栅栏并抛出TimeOut异常​​。​​注意​​:如果其中有​​一个线程因为等待超时而退出​​,那么整盘游戏也会结束,​​其他线程都会被唤醒​​。最后解锁。

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException { // 上锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 保存当前party时的generation快照,generation更新后不会影响这里 final Generation g = generation; // 当party被强制中断时,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 判断当前线程是否被中断 if (Thread.interrupted()) { // 有中断的线程混入其中: // 1)broken当前generation; // 2)唤醒CyclicBarrier阻塞的所有线程;重新开始,注意:此时没有改变party的Generation // 3)抛出线程中断异常 breakBarrier(); throw new InterruptedException(); } // 每次都将计数器的值减1,即未到场的参会人个数减一。 int index = --count; // 当前线程是最后一个到达party的线程时,回调barrierCommand,然后唤醒所有阻塞在条件变量上的线程。 // 若回调barrierCommand正常完成,则不需要手动调用reset()就可进入新的一代,因为这里调用了nextGeneration() if (index == 0) { boolean ranAction = false; try { // 唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) // 若回调方法运行出现异常,异常直接上抛 command.run(); ranAction = true; // 唤醒所有阻塞的线程,并进入下一代party,修改generation。 nextGeneration(); return 0; } finally { // barrierCommand回调方法发生了异常,那么设置broker标志位,并唤醒所有阻塞的线程 if (!ranAction) breakBarrier(); } } // 循环等待最后一个参与party的线程 唤醒自己 或者 被中断 或者 等待超时 for (;;) { try { // 根据传入的参数决定是定时等待还是非定时等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 发生中断异常 // 如果当前线程在CyclicBarrier的等待唤醒期间(即:g没有被改变)被中断,则中断generation,唤醒所有线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 如果当前线程已经完成在CyclicBarrier上的等待(即:g被改变),则直接标志当前线程被中断 Thread.currentThread().interrupt(); } } // 如果线程因为broken generation操作而被唤醒则抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果g != generation ,说明CyclicBarrier换代了(即:generation改变了),线程因此被唤醒的话,则返回还有多个参会者没进来,即计数器的值count。 if (g != generation) return index; // 如果线程因为时间到了而被唤醒,这broken generation 并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}

3、breakBarrier()方法:

意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。await()方法通过抛出BrokenBarrierException 异常返回;

private void breakBarrier() { // 中断当前generation,即打破栅栏 generation.broken = true; // 设置 未参会者数量count 等于 所有需要参会者数量 count = parties; // 唤醒所有阻塞的线程 trip.signalAll();}

4、nextGeneration()方法:

开启栅栏新的一代。

private void nextGeneration() { // 唤醒所有阻塞的线程 trip.signalAll(); // 设置 未参会者数量count 等于 所有需要参会者数量 count = parties; // 生成栅栏的下一代Generation,这也是和breakBarrier()方法的区别 generation = new Generation();}

5、reset()方法:

​​重置一个栅栏​​:打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;开始新的下一代,重置count和generation。

public void reset() { final ReentrantLock lock = this.lock; // 上锁 lock.lock(); try { breakBarrier(); // 将所有参与party的线程唤醒 nextGeneration(); // 生成下一代 } finally { lock.unlock(); }}

若​​barrierCommand正常完成​​​,则​​不需要手动调用reset()​​​就可​​自动进入新的一代​​,因为运行barrierCommand之后调用了nextGeneration()。

四、总结

简单说就是一个​​ReentrantLock​​​加上一个​​Condition条件变量​​​实现并发控制和多个线程的阻塞等待。 并且采用​​​多线程协作​​​机制,在多个线程协作过程中,只要有​​一个线程被中断或者发生异常​​​,则​​整个协作过程取消​​。

CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?

2、CyclicBarrier和CountDownLatch的区别?

1> 是否可复用?​​CountDownLatch​​的计数器只能使用一次;​​CyclicBarrier​​​的计数器可以使用reset()方法重置进而​​循环使用​​。2> 唤醒方式?​​CountDownLatch​​​是多线程阻塞后,需要等待外界条件达到某种状态才会​​被统一唤醒​​​,即​​CountDownLatch​​​还需要​​额外的countDown()唤醒​​操作。​​CyclicBarrier​​​是​​多线程协作​​​,当线程达到等待数量或者一个线程出现异常或被中断时​​自动放行​​​;即​​CyclicBarrier中只需要await()​​,不需要额外的唤醒操作;3> 从性能来看:​​CountDownLatch​​​的性能大于CyclicBarrier,因为​​CountDownLatch​​​是自己采用​​CAS​​​,利用​​共享锁​​的原理实现;而​​CyclicBarrier​​​是采用​​ReentrantLock独占锁​​​ +​​Condition条件变量​​实现;4> 从应用场景来看:​​CyclicBarrier​​​能处理更为​​复杂的业务场景​​。5> 简言之:可以理解为​​CountDownLatch​​是一个计数器,线程完成一个记录一个,只不过计数不是递增而是递减。而​​CyclicBarrier​​是一个阀门,所有线程都到了 ,阀门才打开。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:RocketMQ之Consumer端负载均衡源码解析(RebalanceImpl)
下一篇:详解eclipse项目中的.classpath文件原理
相关文章

 发表评论

暂时没有评论,来抢沙发吧~