《JUC》CountDownLatch原理、源码解析

网友投稿 742 2022-09-03

《JUC》CountDownLatch原理、源码解析

《JUC》CountDownLatch原理、源码解析

一、概述

1、作用

允许一个或多个线程等待,直到在其他线程中执行的一组操作完成,再继续执行。

2、demo

常用方法:

// 创建一个count为2的计数器;CountDownLatch count = new CountDownLatch(2);// 对计数器进行递减1操作,当计数器递减至0时,当前线程会去唤醒阻塞队列里的所有线程;count.countDown();//阻塞当前线程,将当前线程加入阻塞队列;直到count为0时 唤醒当前线程。count.await();// 在timeout的时间之内阻塞当前线程,时间一过当前线程就可以继续执行。count.await(long timeout, TimeUnit unit);

该demo源自CountDownLatch源码上的注释。

public class CountDownLatchTest { static class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { doWork(i); doneSignal.countDown(); } void doWork(int i) { System.out.println(i); } } public static void main(String[] args) throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(5); ExecutorService e = Executors.newFixedThreadPool(3); for (int i = 0; i < 5; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish System.out.println("所有线程执行结束,回到主线程"); e.shutdown(); }}

3、应用场景

CountDownLatch可以用于 一个或者多个线程在执行之前 依赖于 某些前提业务先执行的场景。

多线程加载文件中的数据到内存中,等到所有的文件数据都加载完之后;主线程继续执行。

源码中的应用:

RocketMQ中BrokerOuterAPI#registerBrokerAll()方法中,Broker采用线程池机制向所有的NameServer中发送心跳时用CountDownLatch来阻塞主线程,保证同步发送

二、实现原理

1> CountDownLatch在并发编程中充当一个计时器,其维护一个变量count用于表示操作数;当一个操作执行完成时,原子性的减少一个count;当count变量为0时,代表所有操作均已完成,唤醒等待的线程。2> 具体代码实现:CountDownLatch内部集成一个​​Sync​​​类,从其构造函数可以看到:表示操作数的​​count​​值和核心操作都由同步器类Sync完成。而Sync集成自AQS,所以本质上CountDownLatch就是使用AQS的共享锁机制,实现锁的获取、线程的阻塞/唤醒;当调用​​countDownLatch.await()​​​方法时,会尝试获取共享锁,只有当​​count​​的值为0时才能获取到;未获取到锁的话,则创建一个线程节点Node,通过AQS的​​addWaiter()​​方法加入到AQS的阻塞队列,并把当前线程阻塞挂起。当调用​​countDownLatch.countDown()​​​方法时,会在AQS中释放一次共享锁,即对​​count​​​,也就是AQS的​​state​​变量进行减一操作;当state=0时,将AQS阻塞队列里的阻塞线程全部唤醒;即阻塞在await()方法那的线程获取到共享锁,继续往下执行。3> 另外根据happens-before原则,await()方法一定happens-before countDown()方法。

三、源码解析

1、核心变量和构造函数

CountDownLatch下就一个Sync变量,由此可见:CountDownLatch的所有操作都是围绕Sync类来的。

// Sync同步器,通过该对象调用AQS的方法,进而实现线程的阻塞和唤醒private final Sync sync;// 根据传入的count值初始化计数器public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); // 创建一个Sync对象 并传入count值,Sync内部将会执行setState(count) this.sync = new Sync(count);}

private static final class Sync extends AbstractQueuedSynchronizer { Sync(int count) { setState(count); }}

2、await()

当我们调用countDownLatch.wait()的时候,会创建一个线程节点Node,加入到AQS的阻塞队列,并把改线程挂起阻塞。

内部调用AQS的实现方法​​acquireSharedInterruptibly()​​​,我们可以看到模板方法​​acquireSharedInterruptibly()​​,响应线程中断式的获取锁。

public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1);}

下面是AQS的​​acquireSharedInterruptibly()​​方法:

// AQS类的模板方法public final void acquireSharedInterruptibly(int arg) throws InterruptedException { // 响应线程的中断 if (Thread.interrupted()) throw new InterruptedException(); // 调用子类tryAcquireShared()方法的实现,让子类实现自己获取信号量的机制。 // tryAcquireShared()方法返回-1表示获取锁失败,阻塞当前线程节点 if (tryAcquireShared(arg) < 0) // 线程获取锁失败,加入到AQS的同步队列中阻塞等待。 doAcquireSharedInterruptibly(arg);}

CountDownLatch的内部类Sync中实现了尝试获取共享锁(​​tryAcquireShared​​)逻辑;

// Sync类中protected int tryAcquireShared(int acquires) { // 如果state变量为0,表示共享锁已经被取完了,无法再获取共享锁。即:当AQS中的state变量不为0时,也就是CountDownLatch中的count还不为0,需要阻塞线程。 return (getState() == 0) ? 1 : -1;}

CountDownLatch实际上是一种​​共享锁机制​​​,即锁可以同时被多个线程获取。因为一旦count被减小为0,所有的线程走到await()方法时,都能够顺利通过,不会因为获取不到锁而阻塞。 而且Sync直接将count值作为AQS的state的值,只有state的值为0,线程才能获取锁,即获得执行线程的权限。

3、countDown()

当我们调用countDownLatch.down()方法的时候,会对计数器进行减1操作,AQS内部是通过释放共享锁的方式,对state进行减1操作;当state=0的时候会将AQS阻塞队列里的阻塞线程全部唤醒。

public void countDown() { sync.releaseShared(1);}

滋滋滋,又是完成依赖AQS的实现;

// 调用AQS类的模板方法public final boolean releaseShared(int arg) { // 子类自己实现自己的释放锁逻辑 if (tryReleaseShared(arg)) { // 释放锁成功之后,唤醒阻塞队列中的线程。 doReleaseShared(); return true; } return false;}

CountDownLatch的内部类Sync中实现了尝试释放共享锁(​​tryAcquireShared​​)逻辑;

protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); // 如果 state == 0,表明没有线程持有锁 if (c == 0) return false; int nextc = c-1; // CAS 修改state变量,如果修改后的state = 0 表明释放锁成功 if (compareAndSetState(c, nextc)) return nextc == 0; }}

四、总结

啊这,CountDownLatch真的是非常简单,代码包括注释也就300多行,除了其内部类Sync重写了AQS的两个方法,底层真的是几乎完全靠AQS实现。

关于AQS的实现原理,我们后面的文章见。

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:RocketMQ:常用命令汇总(mqadmin、mqnamesrv、mqbroker)
下一篇:PHP与WorkerMan实现简单的多人在线聊天(thinkphp workerman聊天)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~