企业在数字化转型中如何利用常用前端框架提高开发效率并确保安全合规?
597
2022-10-26
AQS-CountDownLatch&CyclicBarrier&Semaphore
AQS-CountDownLatch&CyclicBarrier&Semaphore
文章目录
AQS-CountDownLatch&CyclicBarrier&Semaphore
CountDownLatch
源码分析
Semaphore
源码分析
CyclicBarrier
源码分析
CountDownLatch
CountDownLatch是JUC包下的一个基于AQS实现的并发工具类,利用他可以实现类似计数器的功能,比如有一个任务A,他要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现
简单使用demo:
public static void main(String[] args) throws Exception{ final CountDownLatch countDownLatch = new CountDownLatch(2); Thread thread1 = new Thread(()->{ try { System.out.println("线程一执行中。。。。"); Thread.sleep(2000); System.out.println("线程一执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t1"); Thread thread2 = new Thread(()->{ try { System.out.println("线程二执行中。。。。"); Thread.sleep(2000); System.out.println("线程二执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t2"); thread1.start(); thread2.start(); countDownLatch.await(); System.out.println("----------线程一二执行完成,继续执行主线程"); }
执行结果:
CountDownLatch中最重要的三个方法:
public void countDown() { }; //将count值减1public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
源码分析
构造方法:
传入计数大小,并且实例化同步器
public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
同步器实现:
// 基于AQS实现private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 获取共享锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }}
await方法:
public void await() throws InterruptedException { // await方法就是可中断的获取共享锁 sync.acquireSharedInterruptibly(1); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;// getState中获取的state,state是我们传入的count,即count != 0返回-1 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 返回-1后就会将其线程封装为节点链接到等待队列中,自旋获取共享锁 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
countDown方法
public void countDown() { // countDown方法其实就是释放共享锁 sync.releaseShared(1); }
Semaphore
Semaphore是和CountDownLatch一样,也是JUC包下的并发工具类,他可以控制并发访问资源的线程数通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。
public class Test { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i 执行结果: 工人0占用一个机器在生产...工人1占用一个机器在生产...工人2占用一个机器在生产...工人4占用一个机器在生产...工人5占用一个机器在生产...工人0释放出机器工人2释放出机器工人3占用一个机器在生产...工人7占用一个机器在生产...工人4释放出机器工人5释放出机器工人1释放出机器工人6占用一个机器在生产...工人3释放出机器工人7释放出机器工人6释放出机器 源码分析 构造方法: public Semaphore(int permits) { // 直接传入许可证数目 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { // 传入许可证加是否是公平锁的标志,等待时间越久的越先获取许可 sync = fair ? new FairSync(permits) : new NonfairSync(permits); } 获取许可: public void acquire() throws InterruptedException { // 获取共享锁 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 等待队列该线程前面有节点在等待,阻塞 if (hasQueuedPredecessors()) return -1; // 无线程等待,许可证减少 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } } 释放许可证 public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { // 当前许可加上释放的许可,CAS更新后返回true for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } CyclicBarrier 通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。 public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i 结果: 线程Thread-0正在写入数据...线程Thread-3正在写入数据...线程Thread-2正在写入数据...线程Thread-1正在写入数据...线程Thread-2写入数据完毕,等待其他线程写入完毕线程Thread-0写入数据完毕,等待其他线程写入完毕线程Thread-3写入数据完毕,等待其他线程写入完毕线程Thread-1写入数据完毕,等待其他线程写入完毕所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务... CountDownLatch 可以实现多个线程的协调,在所有指定线程完成任务后,主线程才继续任务,但是CountDownLatch 有个缺点就是,不可重用,每次都需要创建新的CountDownLatch 实例 源码分析 构造方法 当parties个线程准备就绪后即都调用await方法后,执行barrierAction public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } 准备就绪后,啥事不干 public CyclicBarrier(int parties) { this(parties, null); } await方法 有一个我们常用的方法 await,还有一个内部类,Generation ,仅有一个参数,有什么作用呢? 在 CyclicBarrier 中,有一个 “代” 的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,就像我们的新年一样。当所有人跨过了元旦,日历就更新了。 CyclicBarrier 支持在所有线程通过栅栏的时候,执行一个线程的任务。 private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 锁住 lock.lock(); try { // 当前代 final Generation g = generation; // 如果这代损坏了,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { // 将损坏状态设置为 true // 并通知其他阻塞在此栅栏上的线程 breakBarrier(); throw new InterruptedException(); } // 获取下标 int index = --count; // 如果是 0 ,说明到头了 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行栅栏任务 if (command != null) command.run(); ranAction = true; // 更新一代,将 count 重置,将 generation 重置. // 唤醒之前等待的线程 nextGeneration(); // 结束 return 0; } finally { // 如果执行栅栏任务的时候失败了,就将栅栏失效 if (!ranAction) breakBarrier(); } } for (;;) { try { // 如果没有时间限制,则直接等待,直到被唤醒 if (!timed) trip.await(); // 如果有时间限制,则等待指定时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // g == generation >> 当前代 // ! g.broken >>> 没有损坏 if (g == generation && ! g.broken) { // 让栅栏失效 breakBarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这代的. // 就不会影响当前这代栅栏执行逻辑.所以,就打个标记就好了 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,会调用 breakBarrier 方法. // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // g != generation >>> 正常换代了 // 一切正常,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换代,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要 generation 来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏,并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~