AQS-CountDownLatch&CyclicBarrier&Semaphore

网友投稿 597 2022-10-26

AQS-CountDownLatch&CyclicBarrier&Semaphore

AQS-CountDownLatch&CyclicBarrier&Semaphore

AQS-CountDownLatch&CyclicBarrier&Semaphore

文章目录

​​AQS-CountDownLatch&CyclicBarrier&Semaphore​​

​​CountDownLatch​​

​​源码分析​​

​​Semaphore​​

​​源码分析​​

​​CyclicBarrier​​

​​源码分析​​

CountDownLatch

CountDownLatch是JUC包下的一个基于AQS实现的并发工具类,利用他可以实现类似计数器的功能,比如有一个任务A,他要等待其他4个任务执行完毕之后才能执行,此时就可以利用CountDownLatch来实现

简单使用demo:

public static void main(String[] args) throws Exception{ final CountDownLatch countDownLatch = new CountDownLatch(2); Thread thread1 = new Thread(()->{ try { System.out.println("线程一执行中。。。。"); Thread.sleep(2000); System.out.println("线程一执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t1"); Thread thread2 = new Thread(()->{ try { System.out.println("线程二执行中。。。。"); Thread.sleep(2000); System.out.println("线程二执行完成。"); countDownLatch.countDown(); }catch (Exception e){ } },"t2"); thread1.start(); thread2.start(); countDownLatch.await(); System.out.println("----------线程一二执行完成,继续执行主线程"); }

执行结果:

CountDownLatch中最重要的三个方法:

public void countDown() { }; //将count值减1public boolean await(long timeout, TimeUnit unit) throws InterruptedException { }; //和await()类似,只不过等待一定的时间后count值还没变为0的话就会继续执行public void await() throws InterruptedException { }; //调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行

源码分析

构造方法:

传入计数大小,并且实例化同步器

public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }

同步器实现:

// 基于AQS实现private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L; Sync(int count) { setState(count); } int getCount() { return getState(); } // 获取共享锁 protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } // 释放共享锁 protected boolean tryReleaseShared(int releases) { // Decrement count; signal when transition to zero for (;;) { int c = getState(); if (c == 0) return false; int nextc = c-1; if (compareAndSetState(c, nextc)) return nextc == 0; } }}

await方法:

public void await() throws InterruptedException { // await方法就是可中断的获取共享锁 sync.acquireSharedInterruptibly(1); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1;// getState中获取的state,state是我们传入的count,即count != 0返回-1 } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } // 返回-1后就会将其线程封装为节点链接到等待队列中,自旋获取共享锁 private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }

countDown方法

public void countDown() { // countDown方法其实就是释放共享锁 sync.releaseShared(1); }

Semaphore

Semaphore是和CountDownLatch一样,也是JUC包下的并发工具类,他可以控制并发访问资源的线程数通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。

public class Test { public static void main(String[] args) { int N = 8; //工人数 Semaphore semaphore = new Semaphore(5); //机器数目 for(int i=0;i

执行结果:

工人0占用一个机器在生产...工人1占用一个机器在生产...工人2占用一个机器在生产...工人4占用一个机器在生产...工人5占用一个机器在生产...工人0释放出机器工人2释放出机器工人3占用一个机器在生产...工人7占用一个机器在生产...工人4释放出机器工人5释放出机器工人1释放出机器工人6占用一个机器在生产...工人3释放出机器工人7释放出机器工人6释放出机器

源码分析

构造方法:

public Semaphore(int permits) { // 直接传入许可证数目 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { // 传入许可证加是否是公平锁的标志,等待时间越久的越先获取许可 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }

获取许可:

public void acquire() throws InterruptedException { // 获取共享锁 sync.acquireSharedInterruptibly(1); } public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { // 等待队列该线程前面有节点在等待,阻塞 if (hasQueuedPredecessors()) return -1; // 无线程等待,许可证减少 int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }

释放许可证

public void release() { sync.releaseShared(1); } public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; } protected final boolean tryReleaseShared(int releases) { // 当前许可加上释放的许可,CAS更新后返回true for (;;) { int current = getState(); int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }

CyclicBarrier

通过它可以实现让一组线程等待至某个状态之后再全部同时执行。叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用。我们暂且把这个状态就叫做barrier,当调用await()方法之后,线程就处于barrier了。

public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i数据..."); try { Thread.sleep(5000); //以睡眠来模拟写入数据操作 System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有线程写入完毕,继续处理其他任务..."); } }}

结果:

线程Thread-0正在写入数据...线程Thread-3正在写入数据...线程Thread-2正在写入数据...线程Thread-1正在写入数据...线程Thread-2写入数据完毕,等待其他线程写入完毕线程Thread-0写入数据完毕,等待其他线程写入完毕线程Thread-3写入数据完毕,等待其他线程写入完毕线程Thread-1写入数据完毕,等待其他线程写入完毕所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...所有线程写入完毕,继续处理其他任务...

CountDownLatch 可以实现多个线程的协调,在所有指定线程完成任务后,主线程才继续任务,但是CountDownLatch 有个缺点就是,不可重用,每次都需要创建新的CountDownLatch 实例

源码分析

构造方法

当parties个线程准备就绪后即都调用await方法后,执行barrierAction

public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; }

准备就绪后,啥事不干

public CyclicBarrier(int parties) { this(parties, null); }

await方法

有一个我们常用的方法 await,还有一个内部类,Generation ,仅有一个参数,有什么作用呢?

在 CyclicBarrier 中,有一个 “代” 的概念,因为 CyclicBarrier 是可以复用的,那么每次所有的线程通过了栅栏,就表示一代过去了,就像我们的新年一样。当所有人跨过了元旦,日历就更新了。

CyclicBarrier 支持在所有线程通过栅栏的时候,执行一个线程的任务。

private int dowait(boolean timed, long nanos) throws InterruptedException, BrokenBarrierException, TimeoutException { final ReentrantLock lock = this.lock; // 锁住 lock.lock(); try { // 当前代 final Generation g = generation; // 如果这代损坏了,抛出异常 if (g.broken) throw new BrokenBarrierException(); // 如果线程中断了,抛出异常 if (Thread.interrupted()) { // 将损坏状态设置为 true // 并通知其他阻塞在此栅栏上的线程 breakBarrier(); throw new InterruptedException(); } // 获取下标 int index = --count; // 如果是 0 ,说明到头了 if (index == 0) { // tripped boolean ranAction = false; try { final Runnable command = barrierCommand; // 执行栅栏任务 if (command != null) command.run(); ranAction = true; // 更新一代,将 count 重置,将 generation 重置. // 唤醒之前等待的线程 nextGeneration(); // 结束 return 0; } finally { // 如果执行栅栏任务的时候失败了,就将栅栏失效 if (!ranAction) breakBarrier(); } } for (;;) { try { // 如果没有时间限制,则直接等待,直到被唤醒 if (!timed) trip.await(); // 如果有时间限制,则等待指定时间 else if (nanos > 0L) nanos = trip.awaitNanos(nanos); } catch (InterruptedException ie) { // g == generation >> 当前代 // ! g.broken >>> 没有损坏 if (g == generation && ! g.broken) { // 让栅栏失效 breakBarrier(); throw ie; } else { // 上面条件不满足,说明这个线程不是这代的. // 就不会影响当前这代栅栏执行逻辑.所以,就打个标记就好了 Thread.currentThread().interrupt(); } } // 当有任何一个线程中断了,会调用 breakBarrier 方法. // 就会唤醒其他的线程,其他线程醒来后,也要抛出异常 if (g.broken) throw new BrokenBarrierException(); // g != generation >>> 正常换代了 // 一切正常,返回当前线程所在栅栏的下标 // 如果 g == generation,说明还没有换代,那为什么会醒了? // 因为一个线程可以使用多个栅栏,当别的栅栏唤醒了这个线程,就会走到这里,所以需要判断是否是当前代。 // 正是因为这个原因,才需要 generation 来保证正确。 if (g != generation) return index; // 如果有时间限制,且时间小于等于0,销毁栅栏,并抛出异常 if (timed && nanos <= 0L) { breakBarrier(); throw new TimeoutException(); } } } finally { lock.unlock(); }}

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:动态规划专题复习(一)计数问题
下一篇:OneThink- 基于 ThinkPHP 的开源内容管理框架
相关文章

 发表评论

暂时没有评论,来抢沙发吧~