黑马程序员技术交流社区
标题: 并发队列:无界阻塞队列 LinkedBlockingQueue 原理探究(1) [打印本页]
作者: hy2014051202 时间: 2017-7-19 14:23
标题: 并发队列:无界阻塞队列 LinkedBlockingQueue 原理探究(1)
一、前言
本文介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。
二、 LinkedBlockingQueue类图结构
如图LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。
[Java] 纯文本查看 复制代码
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);
public static final int MAX_VALUE = 0x7fffffff;
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
//初始化首尾节点
last = head = new Node<E>(null);
}
如图默认队列容量为0x7fffffff;用户也可以自己指定容量。
三、必备基础
3.1 ReentrantLock
可以参考 https://www.atatech.org/articles/80539?flag_data_from=active。
3.2 条件变量(Condition)
条件变量这里使用的是takeLock.newCondition()获取也就是说调用ReentrantLock的方法获取的,那么可预见Condition使用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下。
如图ConditionObject中两个node分别用来存放条件队列的首尾节点,条件队列就是调用条件变量的await方法被阻塞后的节点组成的单向链表。另外ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。
[Java] 纯文本查看 复制代码
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
//如果中断标志被设置了,则抛异常
if (Thread.interrupted())
throw new InterruptedException();
//添加当前线程节点到条件队列,
Node node = addConditionWaiter();
//当前线程释放独占锁
int savedState = fullyRelease(node);
long lastTime = System.nanoTime();
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
//挂起当前线程直到超时
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
long now = System.nanoTime();
nanosTimeout -= now - lastTime;
lastTime = now;
}
//unpark后,当前线程重新获取锁,有可能获取不到被放到AQS的队列
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return nanosTimeout - (System.nanoTime() - lastTime);
}
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
//释放锁,如果失败则抛异常
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
首先如果当前线程中断标志被设置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件队列。
然后尝试释放当前线程拥有的锁并保存当前计数,可知如果当前线程调用awaitNano前没有使用当前条件变量所在的Reetenlock变量调用lock或者lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。
然后调用park挂起当前线程直到超时或者其他线程调用了当前线程的unpark方法,或者调用了当前线程的interupt方法(这时候会抛异常)。
如果超时或者其他线程调用了当前线程的unpark方法,则当前线程从挂起变为激活,获取cpu资源后会继续执行,会重新获取锁。
[Java] 纯文本查看 复制代码
public final void signal() {
//如果当前线程没有持有锁,抛异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//从条件队列找第一个状态为CONDITION的,然后把状态变为0
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
//状态为CONDITION的,然后把状态变为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
//把条件队列的上面状态为0的节点放入AQS阻塞队列
Node p = enq(node);
int ws = p.waitStatus;
//调用unpark激活挂起的线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
首先看调用signal的线程是不是持有了独占锁,没有则抛出异常。
然后获取在条件队列里面待的时间最长的node,把它移动到线程持有的锁所在的AQS队列。
其中enq方法就是把当前节点放入了AQS队列,但是这时候该节点还是在条件队列里面那,那么什么时候从条件队列移除那?其实在await里面的unlinkCancelledWaiters方法。
总结: 无论是条件变量的await和singal都是需要先获取独占锁才能调用,因为条件变量使用的就是独占锁里面的state管理状态,否者会报异常。
四 、带超时时间的offer操作-生产者
在队尾添加元素,如果队列满了,那么等待timeout时候,如果时间超时则返回false,如果在超时前队列有空余空间,则插入后返回true。
[Java] 纯文本查看 复制代码
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {
//空元素抛空指针异常
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
//获取可被中断锁,只有一个线程克获取
putLock.lockInterruptibly();
try {
//如果队列满则进入循环
while (count.get() == capacity) {
//nanos<=0直接返回
if (nanos <= 0)
return false;
//否者调用await进行等待,超时则返回<=0(1)
nanos = notFull.awaitNanos(nanos);
}
//await在超时时间内返回则添加元素(2)
enqueue(new Node<E>(e));
c = count.getAndIncrement();
//队列不满则激活其他等待入队线程(3)
if (c + 1 < capacity)
notFull.signal();
} finally {
//释放锁
putLock.unlock();
}
//c==0说明队列里面有一个元素,这时候唤醒出队线程(4)
if (c == 0)
signalNotEmpty();
return true;
}
private void enqueue(Node<E> node) {
last = last.next = node;
}
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
如果获取锁前面有线程调用了putLock. interrupt(),并且后面没有调用interrupted()重置中断标志,调用lockInterruptibly时候会抛出InterruptedException异常。
队列满的时候调用notFull.awaitNanos阻塞当前线程,当前线程会释放获取的锁,然后等待超时或者其他线程调用了notFull.signal()才会返回并重新获取锁,或者其他线程调用了该线程的interrupt方法设置了中断标志,这时候也会返回但是会抛出InterruptedException异常。
如果超时则直接返回false,如果超时前调用了notFull.signal()则会退出循环,执行(2)添加元素到队列,然后执行(3),(3)的目的是为了激活其他入队等待线程。(4)的话c==0说明队列里面已经有一个元素了,这时候就可以激活等待出队线程了。
另外signalNotEmpty函数是先获取独占锁,然后在调用的signal。
未完待续:因帖子长度限制,请看“并发队列:无界阻塞队列 LinkedBlockingQueue 原理探究(2)”
欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) |
黑马程序员IT技术论坛 X3.2 |