AQS同步组件-CyclicBarrier(循环屏障)解析和用例

网友投稿 673 2022-10-23

AQS同步组件-CyclicBarrier(循环屏障)解析和用例

AQS同步组件-CyclicBarrier(循环屏障)解析和用例

CyclicBarrier原理

CyclicBarrier 的字面意思是可循环使用(Cyclic)的屏障(Barrier)。它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。当某个线程调用了await方法之后,就会进入等待状态,并将计数器+1,直到所有线程调用await方法使计数器为CyclicBarrier设置的值,才可以继续执行,由于计数器可以重复使用,所以我们又叫它循环屏障。CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。CyclicBarrier可以用于多线程计算数据,最后合并计算结果的应用场景。

源码分析

/** * 创建一个新的CyclicBarrier当给定数量的参与方(线程)等待它时,它将触发, * 并且在障碍触发时不执行预定义的操作。 * * @param 在barrier被触发之前必须调用await()的线程数 * @throws IllegalArgumentException 如果parties小于1抛出异常 */ public CyclicBarrier(int parties) { this(parties, null); } /** * * 当前线程调用await方法的线程告知CyclicBarrier已经到达屏障,然后当前线程被阻塞 * * @return 当前线程的到达索引,其中索引为- 1表示第一个到达的,0表示最后一个到达的 * @throws InterruptedException 如果当前线程在等待时被中断 * @throws BrokenBarrierException 如果另一个线程在当前线程等待时被中断或超时, * 或者屏障被重置,或者在调用await方法时屏障被破坏,或者屏障操作(如果存在)由于异常而失败 */ public int await() throws InterruptedException, BrokenBarrierException { try { return dowait(false, 0L); } catch (TimeoutException toe) { throw new Error(toe); // cannot happen } }

使用案例

await()

/** * 线程数量 */ private final static int threadCount = 15; /** * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程 */ private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready {}", threadNum,barrier.getNumberWaiting()); //每次调用await方法后计数器+1,当前线程被阻塞 barrier.await(); log.info("{} continue", threadNum); }

输出结果:

16:16:40.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 is ready 0 16:16:41.244 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 is ready 1 16:16:42.244 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 is ready 2 16:16:43.244 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 is ready 3 16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 is ready 4 16:16:44.245 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 4 continue 16:16:44.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 0 continue 16:16:44.245 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 1 continue 16:16:44.245 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 2 continue 16:16:44.245 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 3 continue 16:16:45.245 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 is ready 0 16:16:46.245 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 is ready 1 16:16:47.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 is ready 2 16:16:48.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 is ready 3 16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 is ready 4 16:16:49.246 [pool-1-thread-6] INFO com.zjq.aqs.CyclicBarrier - 5 continue 16:16:49.246 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 9 continue 16:16:49.246 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 6 continue 16:16:49.246 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 8 continue 16:16:49.246 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 7 continue 16:16:50.247 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 is ready 0 16:16:51.247 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 is ready 1 16:16:52.247 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 is ready 2 16:16:53.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 is ready 3 16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 is ready 4 16:16:54.248 [pool-1-thread-4] INFO com.zjq.aqs.CyclicBarrier - 14 continue 16:16:54.248 [pool-1-thread-5] INFO com.zjq.aqs.CyclicBarrier - 10 continue 16:16:54.248 [pool-1-thread-3] INFO com.zjq.aqs.CyclicBarrier - 12 continue 16:16:54.248 [pool-1-thread-2] INFO com.zjq.aqs.CyclicBarrier - 11 continue 16:16:54.248 [pool-1-thread-1] INFO com.zjq.aqs.CyclicBarrier - 13 continue

通过输出结果可以知道,每次屏障会阻塞5个线程,5个线程执行后计数器达到预设值,继续执行后续操作。

await(long timeout, TimeUnit unit)

/** * 线程数量 */ private final static int threadCount = 15; /** * 屏障拦截的线程数量为5,表示每次屏障会拦截5个线程 */ private static CyclicBarrier barrier = new CyclicBarrier(5); public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newCachedThreadPool(); for (int i = 0; i < threadCount; i++) { final int threadNum = i; Thread.sleep(1000); executor.execute(() -> { try { race(threadNum); } catch (Exception e) { log.error("exception", e); } }); } executor.shutdown(); } private static void race(int threadNum) throws Exception { Thread.sleep(1000); log.info("{} is ready{}", threadNum,barrier.getNumberWaiting()); //每次调用await方法后计数器+1,当前线程被阻塞 //等待2s.为了使在发生异常的时候,不影响其他线程,一定要catch //由于设置了超时时间后阻塞的线程可能会被中断,抛出BarrierException异常,如果想继续往下执行,需要加上try-catch try { barrier.await(2, TimeUnit.SECONDS); }catch (Exception e){ //查看执行异常的线程 log.info("线程{} 执行异常,阻塞被中断?{}",threadNum,barrier.isBroken()); } log.info("{} continue", threadNum); }

输出结果:

17:06:24.440 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 is ready0 17:06:25.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 is ready1 17:06:26.435 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 is ready2 17:06:26.455 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 线程0 执行异常,阻塞被中断?true 17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 线程2 执行异常,阻塞被中断?true 17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程1 执行异常,阻塞被中断?true 17:06:26.456 [pool-1-thread-1] INFO com.zjq.CyclicBarrier - 0 continue 17:06:26.456 [pool-1-thread-3] INFO com.zjq.CyclicBarrier - 2 continue 17:06:26.456 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 1 continue 17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 is ready0 17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程3 执行异常,阻塞被中断?true 17:06:27.434 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 3 continue 17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 is ready0 17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程4 执行异常,阻塞被中断?true 17:06:28.435 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 4 continue 17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 is ready0 17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程5 执行异常,阻塞被中断?true 17:06:29.435 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 5 continue 17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 is ready0 17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程6 执行异常,阻塞被中断?true 17:06:30.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 6 continue 17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 is ready0 17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程7 执行异常,阻塞被中断?true 17:06:31.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 7 continue 17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 is ready0 17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 线程8 执行异常,阻塞被中断?true 17:06:32.436 [pool-1-thread-2] INFO com.zjq.CyclicBarrier - 8 continue 17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 is ready0 17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 线程9 执行异常,阻塞被中断?true 17:06:33.436 [pool-1-thread-4] INFO com.zjq.CyclicBarrier - 9 continue

CyclicBarrier(int parties, Runnable barrierAction)

/** * 线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景 */ private static CyclicBarrier barrier = new CyclicBarrier(5, () -> { log.info("callback is running"); });

输出结果:

17:11:38.867 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 is ready 17:11:38.966 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 is ready 17:11:39.067 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 is ready 17:11:39.167 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 is ready 17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 is ready 17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - callback is running 17:11:39.268 [pool-1-thread-5] INFO com.zjq.CyclicBarrier3 - 4 continue 17:11:39.268 [pool-1-thread-1] INFO com.zjq.CyclicBarrier3 - 0 continue 17:11:39.268 [pool-1-thread-2] INFO com.zjq.CyclicBarrier3 - 1 continue 17:11:39.268 [pool-1-thread-3] INFO com.zjq.CyclicBarrier3 - 2 continue 17:11:39.268 [pool-1-thread-4] INFO com.zjq.CyclicBarrier3 - 3 continue 17:11:39.369 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 is ready 17:11:39.470 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 is ready 17:11:39.570 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 is ready 17:11:39.671 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 is ready 17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 is ready 17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - callback is running 17:11:39.772 [pool-1-thread-10] INFO com.zjq.CyclicBarrier3 - 9 continue 17:11:39.772 [pool-1-thread-6] INFO com.zjq.CyclicBarrier3 - 5 continue 17:11:39.772 [pool-1-thread-9] INFO com.zjq.CyclicBarrier3 - 8 continue 17:11:39.772 [pool-1-thread-7] INFO com.zjq.CyclicBarrier3 - 6 continue 17:11:39.772 [pool-1-thread-8] INFO com.zjq.CyclicBarrier3 - 7 continue

CyclicBarrier和CountDownLatch的区别

CountDownLatch的计数器只能使用一次。而CyclicBarrier的计数器可以使用reset()方法重置。所以CyclicBarrier能处理更为复杂的业务场景,比如如果计算发生错误,可以重置计数器,并让线程们重新执行一次。 CountDownLatch主要用于实现一个或n个线程需要等待其他线程完成某项操作之后,才能继续往下执行,描述的是一个或n个线程等待其他线程的关系,而CyclicBarrier是多个线程相互等待,知道满足条件以后再一起往下执行。描述的是多个线程相互等待的场景 CyclicBarrier还提供其他有用的方法,比如getNumberWaiting方法可以获得CyclicBarrier阻塞的线程数量。isBroken方法用来知道阻塞的线程是否被中断。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:基于Python的接口自动化框架
下一篇:Bee.WeiXin- 微信公众平台开发框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~