A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

一、前言

本文介绍下使用独占锁实现的阻塞队列LinkedBlockingQueue的实现。

二、 LinkedBlockingQueue类图结构


如图LinkedBlockingQueue中也有两个Node分别用来存放首尾节点,并且里面有个初始值为0的原子变量count用来记录队列元素个数,另外里面有两个ReentrantLock的独占锁,分别用来控制元素入队和出队加锁,其中takeLock用来控制同时只有一个线程可以从队列获取元素,其他线程必须等待,putLock控制同时只能有一个线程可以获取锁去添加元素,其他线程必须等待。另外notEmpty和notFull用来实现入队和出队的同步。 另外由于出入队是两个非公平独占锁,所以可以同时又一个线程入队和一个线程出队,其实这个是个生产者-消费者模型。
[Java] 纯文本查看 复制代码
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
/* Current number of elements /
private final AtomicInteger count = new AtomicInteger(0);

public static final int   MAX_VALUE = 0x7fffffff;

public LinkedBlockingQueue() {
    this(Integer.MAX_VALUE);
}

  public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    //初始化首尾节点
    last = head = new Node<E>(null);
}
如图默认队列容量为0x7fffffff;用户也可以自己指定容量。

三、必备基础

3.1 ReentrantLock

可以参考 https://www.atatech.org/articles/80539?flag_data_from=active。

3.2 条件变量(Condition)

条件变量这里使用的是takeLock.newCondition()获取也就是说调用ReentrantLock的方法获取的,那么可预见Condition使用了ReentrantLock的state。上面的参考没有提到所以这里串串讲下。

  • 首先看下类图结构



如图ConditionObject中两个node分别用来存放条件队列的首尾节点,条件队列就是调用条件变量的await方法被阻塞后的节点组成的单向链表。另外ConditionObject还要依赖AQS的state,ConditionObject是AQS类的一个内部类。

  • awaitNanos操作

[Java] 纯文本查看 复制代码
public final long awaitNanos(long nanosTimeout)
        throws InterruptedException {

    //如果中断标志被设置了,则抛异常
    if (Thread.interrupted())
        throw new InterruptedException();

    //添加当前线程节点到条件队列,
    Node node = addConditionWaiter();

    //当前线程释放独占锁
    int savedState = fullyRelease(node);
    long lastTime = System.nanoTime();
    int interruptMode = 0;

    while (!isOnSyncQueue(node)) {
        if (nanosTimeout <= 0L) {
            transferAfterCancelledWait(node);
            break;
        }
        //挂起当前线程直到超时
        LockSupport.parkNanos(this, nanosTimeout);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;

        long now = System.nanoTime();
        nanosTimeout -= now - lastTime;
        lastTime = now;
    }

    //unpark后,当前线程重新获取锁,有可能获取不到被放到AQS的队列
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
    return nanosTimeout - (System.nanoTime() - lastTime);
}


    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();

            //释放锁,如果失败则抛异常
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }
首先如果当前线程中断标志被设置了,直接抛出异常。添加当前线程节点(状态为:-2)到条件队列。

然后尝试释放当前线程拥有的锁并保存当前计数,可知如果当前线程调用awaitNano前没有使用当前条件变量所在的Reetenlock变量调用lock或者lockInterruptibly获取到锁,会抛出IllegalMonitorStateException异常。

然后调用park挂起当前线程直到超时或者其他线程调用了当前线程的unpark方法,或者调用了当前线程的interupt方法(这时候会抛异常)。

如果超时或者其他线程调用了当前线程的unpark方法,则当前线程从挂起变为激活,获取cpu资源后会继续执行,会重新获取锁。

  • signal操作

[Java] 纯文本查看 复制代码
public final void signal() {

    //如果当前线程没有持有锁,抛异常
    if (!isHeldExclusively())
        throw new IllegalMonitorStateException();

    //从条件队列找第一个状态为CONDITION的,然后把状态变为0
    Node first = firstWaiter;
    if (first != null)
        doSignal(first);
}

private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

final boolean transferForSignal(Node node) {


    //状态为CONDITION的,然后把状态变为0
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
        return false;


    //把条件队列的上面状态为0的节点放入AQS阻塞队列
    Node p = enq(node);
    int ws = p.waitStatus;

    //调用unpark激活挂起的线程
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
首先看调用signal的线程是不是持有了独占锁,没有则抛出异常。
然后获取在条件队列里面待的时间最长的node,把它移动到线程持有的锁所在的AQS队列。

其中enq方法就是把当前节点放入了AQS队列,但是这时候该节点还是在条件队列里面那,那么什么时候从条件队列移除那?其实在await里面的unlinkCancelledWaiters方法。

总结: 无论是条件变量的await和singal都是需要先获取独占锁才能调用,因为条件变量使用的就是独占锁里面的state管理状态,否者会报异常。

四 、带超时时间的offer操作-生产者

在队尾添加元素,如果队列满了,那么等待timeout时候,如果时间超时则返回false,如果在超时前队列有空余空间,则插入后返回true。
[Java] 纯文本查看 复制代码
public boolean offer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {

    //空元素抛空指针异常
    if (e == null) throw new NullPointerException();
    long nanos = unit.toNanos(timeout);
    int c = -1;
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;

    //获取可被中断锁,只有一个线程克获取
    putLock.lockInterruptibly();
    try {

        //如果队列满则进入循环
        while (count.get() == capacity) {
            //nanos<=0直接返回
            if (nanos <= 0)
                return false;
            //否者调用await进行等待,超时则返回<=0(1)
            nanos = notFull.awaitNanos(nanos);
        }
        //await在超时时间内返回则添加元素(2)
        enqueue(new Node<E>(e));
        c = count.getAndIncrement();

        //队列不满则激活其他等待入队线程(3)
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        //释放锁
        putLock.unlock();
    }

    //c==0说明队列里面有一个元素,这时候唤醒出队线程(4)
    if (c == 0)
        signalNotEmpty();
    return true;
}

private void enqueue(Node<E> node) {   
    last = last.next = node;
}

    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }
如果获取锁前面有线程调用了putLock. interrupt(),并且后面没有调用interrupted()重置中断标志,调用lockInterruptibly时候会抛出InterruptedException异常。

队列满的时候调用notFull.awaitNanos阻塞当前线程,当前线程会释放获取的锁,然后等待超时或者其他线程调用了notFull.signal()才会返回并重新获取锁,或者其他线程调用了该线程的interrupt方法设置了中断标志,这时候也会返回但是会抛出InterruptedException异常。

如果超时则直接返回false,如果超时前调用了notFull.signal()则会退出循环,执行(2)添加元素到队列,然后执行(3),(3)的目的是为了激活其他入队等待线程。(4)的话c==0说明队列里面已经有一个元素了,这时候就可以激活等待出队线程了。

另外signalNotEmpty函数是先获取独占锁,然后在调用的signal。

未完待续因帖子长度限制,请看“并发队列:无界阻塞队列 LinkedBlockingQueue 原理探究(2)”

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马