并发容器:手写一个阻塞队列

网友投稿 945 2022-10-08

并发容器:手写一个阻塞队列

并发容器:手写一个阻塞队列

Java中提供的阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入的元素,直到队列不满支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者从队列里取元素的线程。阻塞队列就是生产者用来存放元素,消费者用来获取元素的容器

插入和移除操作的4种处理方式

方法/处理方式

抛出异常

Queue接口

返回特殊值

Queue接口

一直阻塞

BlockingQueue接口

超时退出

BlockingQueue接口

插入方法

boolean add©

boolean offer(e)

void put(e)

boolean offer(e, time, unit)

解释

添加元素,添加成功返回true,如果队列满了,抛出IllegalStateException

添加元素,添加成功返回true,如果队列满了,返回false

添加元素,如果队列已经满了,则阻塞等待

添加元素,添加成功返回true,如果队列已经满了,则阻塞等待,指定时间已经过去还没能添加成功元素,则返回false

移除方法

E remove()

E poll()

E take()

E poll(time, unit)

解释

返回头结点,从队列中移除头节点,如果队列为空,抛出NoSuchElementException

返回头结点,从队列中移除头节点,如果队列为空,返回null

返回头结点,从队列中移除头节点,如果队列为空则阻塞等待

返回头结点,从队列中移除头节点,队列中没元素会一直阻塞等待,指定时间已经过去还没能拿到头节点,则返回null

检查方法

E element()

E peek()

不可用

不可用

解释

返回头结点,但是不从队列中移除头节点,如果队列为空,抛出NoSuchElementException

返回头结点,但是不从队列中移除头节点,如果队列为空,返回null

用等待/通知协调生产者和消费者

举一个多生产者,多消费者的例子,队列的大小为3,即队列大小为3时,生产者就不再生产

public class Producer implements Runnable { private ArrayList list; private int capacity; public Producer(ArrayList queue,int capacity) { this.list = queue; this.capacity = capacity; } @Override public void run() { synchronized (list) { while (true) { while (list.size() == capacity) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object object = list.add(new Object()); System.out.println(Thread.currentThread().getName() + " 生产"); list.notifyAll(); } } }}

注意消费者和生产者都是用的notifyAll()[通知所有阻塞的线程]方法,而不是notify()[通知一个阻塞的线程]方法,因为有可能出现“生产者”唤醒“生产者”,消费者“唤醒”消费者的情况,因此有可能造成死锁。

这里以1个消费者,3个生产者为例说一下,消费者1获得锁还没产品呢,阻塞,接着生产者1获得锁生产完了,然后生产者2获得锁后生产完了,再然后生产者3获得锁生产完了,最后生产者1获得锁了,然后阻塞了,现在好了生产者和消费者都阻塞了,造成了死锁。notifyAll()则不会造成死锁,接着上面的步骤,生产者3生产完了释放锁后,会通知所有阻塞的线程,因此消费者1肯定有机会拿到锁来进行消费

public class Consumer implements Runnable { private List list; public Consumer(List queue) { this.list = queue; } @Override public void run() { synchronized (list) { while (true) { while (list.size() == 0) { try { list.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } Object object = list.remove(0); System.out.println(Thread.currentThread().getName() + " 消费"); list.notifyAll(); } } }}

public class Test { public static void main(String[] args) { ArrayList list = new ArrayList(3); for (int i = 0; i < 3; i++) { new Thread(new Producer(list, 3), "生产者" + i).start(); new Thread(new Consumer(list), "消费者" + i).start(); } }}

一部分结果

生产者0 生产生产者0 生产生产者0 生产消费者1 消费消费者1 消费消费者1 消费生产者1

把这个实例用阻塞队列来改写,先自己写一个阻塞队列,实现BlockingQueue接口,这里只展示了一部分重写的方法

public class MyBlockingQueue implements BlockingQueue { private int capacity; private List list; public MyBlockingQueue(int capacity) { this.capacity = capacity; this.list = new ArrayList(capacity); } @Override public void put(E e) throws InterruptedException { synchronized (this) { if (list.size() == capacity) { this.wait(); } list.add(e); this.notifyAll(); } } @Override public E take() throws InterruptedException { synchronized (this) { while (list.size() == 0) { this.wait(); } E value = list.remove(0); this.notifyAll(); return value; } }}

public class Producer implements Runnable { private BlockingQueue queue; public Producer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { queue.put(new Object()); System.out.println(Thread.currentThread().getName() + " 生产"); } catch (InterruptedException e) { e.printStackTrace(); } } }}

public class Consumer implements Runnable { private BlockingQueue queue; public Consumer(BlockingQueue queue) { this.queue = queue; } @Override public void run() { while (true) { try { Object object = queue.take(); System.out.println(Thread.currentThread().getName() + " 消费"); } catch (InterruptedException e) { e.printStackTrace(); } } }}

public class Test { public static void main(String[] args) { BlockingQueue queue = new MyBlockingQueue(3); for (int i = 0; i < 3; i++) { new Thread(new Producer(queue), "生产者" + i).start(); new Thread(new Consumer(queue), "消费者" + i).start(); } }}

这里将BlockingQueue的实现改为ArrayBlockingQueue,程序运行结果一样,和我们之前的例子比较,BlockingQueue其实就是不用我们自己写阻塞和唤醒的部分,直接看一下ArrayBlockingQueue的源码,其实和我自己实现的差不多,只不过是并发这部分源码用的是ReentLock,而我用的是synchronized

ArrayBlockingQueue

设置notEmpty和notFull2个Condition用来实现线程通信

// 设置同步队列的大小和锁是否公平public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}

往阻塞队列中放元素,当放满时,将线程阻塞到notFull这个等待队列。 可以正常放入阻塞队列中时,则唤醒notEmpty中的线程,让其消费元素

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }}

private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; // 循环数组实现 if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();}

从阻塞队列中获取元素时,如果为空,则将线程阻塞到notEmpty这个等待队列。不为空,从阻塞队列取到元素时,还需要唤醒notFull中的线程,让其往阻塞队列中放元素

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }}

private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; // 放到数组的最后一个了,下一次从头开始放 if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) //更新iterators,不再分析 itrs.elementDequeued(); notFull.signal(); return x;}

最后说一下LZ的理解,个人感觉用ArrayBlockingQueue实现生产者和消费者,比我上面用synchronized的方式应该快很多,毕竟ArrayBlockingQueue只会是生成者通知消费者,或者消费者通知生产者,而synchronized不是,会造成很多不必要的锁竞争,当然并没有实验,有时间可以试试

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:微信小程序开发之顶部导航栏实例(小程序顶部导航栏按钮)
下一篇:php如何实现微信小程序支付及退款(微信小程序支付可以退款吗)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~