本帖最后由 不二晨 于 2018-7-12 09:49 编辑
ArrayBlockingQueue源码-JUC阻塞队列1前面写了好久片跟锁有关的博客,就是为了接下来的JUC系列文章,对锁熟悉的看JUC事半功倍了,我是强烈建议大家看下我之前的两篇关于Java锁的文章ReentrantLock可重新入锁
以及条件锁,没办法lock是整个JUC的基础。 我们今天主要看下阻塞队列的几个实现类: ArrayBlockingQueue
LinkedBlockingQueue
SynchronousQueue
DelayedWorkQueue
PriorityBlockingQueue
几个操作阻塞队列的方法: [td]操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 | 插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) | 移除 | remove() | poll() | take() | poll(time, unit) | 检查 | element() | peek() | - | - |
官方有段描述阻塞队列的文字: BlockingQueue 不接受 null 元素
BlockingQueue 可以是限定容量的 没有的话就是Integer.MAX_VALUE
BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持 Collection 接口
BlockingQueue 实现是线程安全的
队列的使用官方有段生产者消费者代码: class Producer implements Runnable { private final BlockingQueue queue; Producer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { queue.put(produce()); } } catch (InterruptedException ex) {} } Object produce() {} } class Consumer implements Runnable { private final BlockingQueue queue; Consumer(BlockingQueue q) { queue = q; } public void run() { try { while(true) { consume(queue.take()); } } catch (InterruptedException ex) { } } void consume(Object x) { } } class Setup { void main() { BlockingQueue q = new SomeQueueImplementation(); Producer p = new Producer(q); Consumer c1 = new Consumer(q); Consumer c2 = new Consumer(q); new Thread(p).start(); new Thread(c1).start(); new Thread(c2).start(); } }生产者不断往队列放数据,放满了,就会自动被阻塞
消费者不断从队列中取数据,取没了,就自动会被阻塞 先看最简答的ArrayBlockingQueue,听名字就知道底层是个数组
![]()
它是一个环形的队列FIFO,putIndex表示当前有多少个数据,takeIndex表示从队列拿出来的数量。
它必须要传入一个容量: public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull = lock.newCondition();}add方法 public boolean add(E e) { return super.add(e);}public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full");}调用父类AbstractQueue 最终调用自身的offer(e) offer一次不成功就抛出异常,否则成功返回,更上面表格是一致的。 offer(e)方法 public boolean offer(E e) { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lock(); try { if (count == items.length) return false; else { enqueue(e); return true; } } finally { lock.unlock(); }}private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal();}已经好久没有见过如此”整齐”的代码了,负责的逻辑已经被ReentrantLock控制住了,所以看起就是操作数组。
队列满了直接返回false
队列没满items[putIndex] = data;达到数组长度重置putIndex,达到环形队列目的,这里notEmpty.signal()在该方法中没什么用处。 put(e)方法 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); }}这里使用的lock.lockInterruptibly() 之前没有说,当前线程如果调用了Thread.interrupt()方法,那么lockInterruptible()判断的Thread.interrupted()聚会成立,就会抛出异常,其实就是线程中断,该方法就抛出异常。
跟offer(e)不同的是当队列满了执行notFull.await() 当前线程进入了条件队列,稍后会挂起,等待被唤醒不熟悉的看我之前的条件锁
一旦被唤醒说明队列不满,将数据添加到队列中。
其实使用阻塞队列我们很多情况都是在使用该put和take方法,但是考虑一个问题我不想一直阻塞,又不想尝试一次不满足就退出,我想等待一段时间,超时之后再不等了该如何做?? 如下offer(e,timeout) offer(e,timeout) public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); }}跟put基本一样主要区别:
notFull.awaitNanos(纳秒数) public final long awaitNanos(long nanosTimeout) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); final long deadline = System.nanoTime() + nanosTimeout; int interruptMode = 0; while (!isOnSyncQueue(node)) { if (nanosTimeout <= 0L) { transferAfterCancelledWait(node); break; } if (nanosTimeout >= spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; nanosTimeout = deadline - System.nanoTime(); } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); return deadline - System.nanoTime();}跟await方法差不多,超时时间设置大于1000纳秒,调用LockSupport.park(timeout)有操作系统底层来调度。 方法都是类似,同理从队列往外取数据我们就看一个take()方法: public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); }}private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x;}队列为空直接阻塞,不为空取出items[takeIndex] takeIndex++
有了锁之后,BlockingQueue就变成了数组添加和移除的操作了!!!!! 跟操作阻塞队列代码似乎看完了,还剩余一些查询的,基本是修改阻塞队列的时候加上锁,排除锁的代码就是在操作一个数组环形队列,但我们可以在思考一些问题:
ArrayBlockingQueue的几个成员如下: final Object[] items;//final保证引用不会被修改
int takeIndex;
int putIndex
int count;
如何判断队列的大小?count ?
如何判断当前将要出队的元素 items[takeIndex]?
我之所以加上? 是因为考虑我在判断队列大小或者出队元素的时候其它线程有可能正在修改队列这个时候会差生一下影响,看下JDK是如何实现的? peek()方法 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items; } 加锁了,看下哪个元素将要出队也加锁了。 size()方法 public int size() { final ReentrantLock lock = this.lock; lock.lock(); try { return count; } finally { lock.unlock(); }}remainingCapacity()方法 public int remainingCapacity() { final ReentrantLock lock = this.lock; lock.lock(); try { return items.length - count; } finally { lock.unlock(); }}contains(Object obj)方法 public boolean contains(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { if (count > 0) { final int putIndex = this.putIndex; int i = takeIndex; do { if (o.equals(items)) return true; if (++i == items.length) i = 0; } while (i != putIndex); } return false; } finally { lock.unlock(); } }不出意料都是加上了锁 用equals比较返回第一个匹配,有多个相同的元素的话只是删除了第一个相同匹配的 总结: 1)ArrayBlockingQueue是有界的阻塞队列,不接受null
2)底层数据接口是数组,下标putIndex/takeIndex,构成一个环形FIFO队列
3)所有的增删改查数组公用了一把锁ReentrantLock,入队和出队数组下标和count变更都是靠这把锁来维护安全的。
4)阻塞的场景:1获取lock锁,2进入和取出还要满足condition 满了或者空了都等待出队和加入唤醒,ArrayBlockingQueue我们主要是put和take真正用到的阻塞方法(条件不满足)。
5)成员cout /putIndex、takeIndex是共享的,所以一些查询方法size、peek、toString、方法也是加上锁保证线程安全,但没有了并发损失了性能。
6)remove(Object obj) 返回了第一个equals的Object
|