微前端架构如何改变企业的开发模式与效率提升
420
2022-11-10
《JUC》CyclicBarrier原理/源码解析
一、概述
1、作用?
允许一组线程互相等待,直到到达某个公共屏障(barrier)点。因为该barrier在释放等待线程后可以重用,所以称它为循环的barrier。
2、使用场景?
用于多线程计算数据,最后合并计算结果的场景。
3、常用类方法?
await():告诉CyclicBarrier,线程已经到达了屏障,计数减一;然后阻塞线程,直到count为0。reset():重置CyclicBarrier的未加入到party的数量count和当前代Generation。
4、案例
public class CyclicBarrierTest { public static void main(String[] args) throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(2); new Thread(() -> { System.out.println("work thread start + 1"); try { TimeUnit.SECONDS.sleep(1); cyclicBarrier.await(); // 睡一会,让主线程先干活,work thread再继续 TimeUnit.MILLISECONDS.sleep(100); System.out.println("main thread OK, work thread go on!"); } catch (Exception e) { e.printStackTrace(); } }, "work-thread").start(); // 主线程在这等着,等其他线程调用await()方法之后,大家再一起执行 cyclicBarrier.await(); System.out.println("main end"); }}
二、原理
1)CyclicBarrier中一个generation代表了每一代,通过这个实现CyclicBarrier的复用。parties变量用来表示参与party的线程数;count变量代表了还没到party的线程数;外加个ReentrantLock锁和一个Condition条件变量实现线程的并发和阻塞。2)在CyclicBarrier类的内部有一个计数器count,每个线程在到达屏障点的时候都会调用await()方法将自己阻塞排队,并将计数器count减1; 3)当计数器count减为0的时候,所有因调用await()方法而被阻塞的线程将被唤醒。 4)线程的排队进入party通过ReentrantLock实现;进入party后睡眠等待所有参会者通过锁的条件等待Condition实现。
三、源码解析
1、成员变量和构造器
CyclicBarrier内部是通过条件队列trip对线程进行阻塞;两个int型的变量parties和count:parties表示每次拦截的线程数,即party的所有参会者;count表示还未拦截的线程数,即还有多少参会者没到party;它的初始值和parties相同,调用await()方法减1,减为0时将所有阻塞在条件变量上线程唤醒。静态内部类Generation,代表栅栏的当前代,就像玩游戏时代表的本局游戏,利用它可以实现循环等待。barrierCommand,表示换代前执行的任务。当前代结束会执行该任务,然后自动开启下一代。
public class CyclicBarrier { // 该类用于CyclicBarrier的一次协同是否完成(正常完成、异常完成) // reset后会复用该结构,每一次的party都会生成一个该类的新实例 private static class Generation { // 当前party有没有被强制中断,false表示没有 boolean broken = false; } // 同步锁,线程进入 条件等待 时需要获取锁 private final ReentrantLock lock = new ReentrantLock(); // 用于阻塞线程的条件变量(有未到party的线程,已到party的线程则等待在该条件变量上) private final Condition trip = lock.newCondition(); // 参与party的人数 private final int parties; // 当所有的线程都参与到了party中, 回调的方法 private final Runnable barrierCommand; // 当前party private Generation generation = new Generation(); // 还未到party的线程数 private int count; // 构造器 public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }}
2、await()方法
CyclicBarrier的核心方法是await(),该方法是线程相互等待的关键,它有两种实现:一种是带等待超时的,一种是不带等待超时,本质上都是调用了同一个方法dowait(),只是带等待超时的多传了一个时间。
public int await() throws InterruptedException, BrokenBarrierException { try { // 直接调用自己的dowait()方法 return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen }}
CyclicBarrier#dowait()方法:
首先因为await()是同步的,需要先加互斥锁ReentrantLock;每次进来都将count减1,减完立马进行判断看看count是否等于0:如果等于0,则执行换代前要执行的任务barrierCommand,然后唤醒所有阻塞等待的线程,接着自动进入CyclicBarrier的下一代;将计数器count的值重新设为parties。如果barrierCommand运行异常,则打破栅栏的当前代,唤醒所有阻塞等待的线程。count不等于0,这进入for循环:不是超时等待,直接调用Condition.await()阻塞当前线程。是超时等待,就在nanos时间内循环竞争锁;如果当前线程在await()获取锁过程中被中断了:在当前代还没结束之前打破栅栏,即游戏在中途被打断,则设置generation的broken状态为true并唤醒所有线程。当前代已经结束,则直接中断当前线程。线程被唤醒后进行下面三个判断:如果线程因为broken generation操作(即调用breakBarrier()方法)而被唤醒则抛出异常;如果线程因为CyclicBarrier正常换代被唤醒,则返回计数器count的值;如果线程因为超时而被唤醒打破栅栏并抛出TimeOut异常。注意:如果其中有一个线程因为等待超时而退出,那么整盘游戏也会结束,其他线程都会被唤醒。最后解锁。
private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException,TimeoutException { // 上锁 final ReentrantLock lock = this.lock; lock.lock(); try { // 保存当前party时的generation快照,generation更新后不会影响这里 final Generation g = generation; // 当party被强制中断时,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 判断当前线程是否被中断 if (Thread.interrupted()) { // 有中断的线程混入其中: // 1)broken当前generation; // 2)唤醒CyclicBarrier阻塞的所有线程;重新开始,注意:此时没有改变party的Generation // 3)抛出线程中断异常 breakBarrier(); throw new InterruptedException(); } // 每次都将计数器的值减1,即未到场的参会人个数减一。 int index = --count; // 当前线程是最后一个到达party的线程时,回调barrierCommand,然后唤醒所有阻塞在条件变量上的线程。 // 若回调barrierCommand正常完成,则不需要手动调用reset()就可进入新的一代,因为这里调用了nextGeneration() if (index == 0) { boolean ranAction = false; try { // 唤醒所有线程前先执行指定的任务 final Runnable command = barrierCommand; if (command != null) // 若回调方法运行出现异常,异常直接上抛 command.run(); ranAction = true; // 唤醒所有阻塞的线程,并进入下一代party,修改generation。 nextGeneration(); return 0; } finally { // barrierCommand回调方法发生了异常,那么设置broker标志位,并唤醒所有阻塞的线程 if (!ranAction) breakBarrier(); } } // 循环等待最后一个参与party的线程 唤醒自己 或者 被中断 或者 等待超时 for (;;) { try { // 根据传入的参数决定是定时等待还是非定时等待 if (!timed) trip.await(); else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // 发生中断异常 // 如果当前线程在CyclicBarrier的等待唤醒期间(即:g没有被改变)被中断,则中断generation,唤醒所有线程 if (g == generation && ! g.broken) { breakBarrier(); throw ie; } else { // 如果当前线程已经完成在CyclicBarrier上的等待(即:g被改变),则直接标志当前线程被中断 Thread.currentThread().interrupt(); } } // 如果线程因为broken generation操作而被唤醒则抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果g != generation ,说明CyclicBarrier换代了(即:generation改变了),线程因此被唤醒的话,则返回还有多个参会者没进来,即计数器的值count。 if (g != generation) return index; // 如果线程因为时间到了而被唤醒,这broken generation 并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}
3、breakBarrier()方法:
意味着有人搞破坏,游戏中途结束,将所有的等待线程全部唤醒。await()方法通过抛出BrokenBarrierException 异常返回;
private void breakBarrier() { // 中断当前generation,即打破栅栏 generation.broken = true; // 设置 未参会者数量count 等于 所有需要参会者数量 count = parties; // 唤醒所有阻塞的线程 trip.signalAll();}
4、nextGeneration()方法:
开启栅栏新的一代。
private void nextGeneration() { // 唤醒所有阻塞的线程 trip.signalAll(); // 设置 未参会者数量count 等于 所有需要参会者数量 count = parties; // 生成栅栏的下一代Generation,这也是和breakBarrier()方法的区别 generation = new Generation();}
5、reset()方法:
重置一个栅栏:打破栅栏 中断当前代,await()方法通过抛出BrokenBarrierException 异常返回;开始新的下一代,重置count和generation。
public void reset() { final ReentrantLock lock = this.lock; // 上锁 lock.lock(); try { breakBarrier(); // 将所有参与party的线程唤醒 nextGeneration(); // 生成下一代 } finally { lock.unlock(); }}
若barrierCommand正常完成,则不需要手动调用reset()就可自动进入新的一代,因为运行barrierCommand之后调用了nextGeneration()。
四、总结
简单说就是一个ReentrantLock加上一个Condition条件变量实现并发控制和多个线程的阻塞等待。 并且采用多线程协作机制,在多个线程协作过程中,只要有一个线程被中断或者发生异常,则整个协作过程取消。
CyclicBarrier和CountDownLatch相同的是,它们都能让多个线程协调在某一个节点上等待;下面我们看一下他们的区别?
2、CyclicBarrier和CountDownLatch的区别?
1> 是否可复用?CountDownLatch的计数器只能使用一次;CyclicBarrier的计数器可以使用reset()方法重置进而循环使用。2> 唤醒方式?CountDownLatch是多线程阻塞后,需要等待外界条件达到某种状态才会被统一唤醒,即CountDownLatch还需要额外的countDown()唤醒操作。CyclicBarrier是多线程协作,当线程达到等待数量或者一个线程出现异常或被中断时自动放行;即CyclicBarrier中只需要await(),不需要额外的唤醒操作;3> 从性能来看:CountDownLatch的性能大于CyclicBarrier,因为CountDownLatch是自己采用CAS,利用共享锁的原理实现;而CyclicBarrier是采用ReentrantLock独占锁 +Condition条件变量实现;4> 从应用场景来看:CyclicBarrier能处理更为复杂的业务场景。5> 简言之:可以理解为CountDownLatch是一个计数器,线程完成一个记录一个,只不过计数不是递增而是递减。而CyclicBarrier是一个阀门,所有线程都到了 ,阀门才打开。
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~