ReentrantLock内部存在3个实现类,分别是Sync、NonfairSync、FairSync,其中Sync继承自AQS实现了解锁tryRelease()方法,而NonfairSync(非公平锁)、 FairSync(公平锁)则继承自Sync,实现了获取锁的tryAcquire()方法,ReentrantLock的所有方法调用都通过间接调用AQS和Sync类及其子类来完成的。从上述类图可以看出AQS是一个抽象类,但请注意其源码中并没一个抽象的方法,这是因为AQS只是作为一个基础组件,并不希望直接作为直接操作类对外输出,而更倾向于作为基础组件,为真正的实现类提供基础设施,如构建同步队列,控制同步状态等,事实上,从设计模式角度来看,AQS采用的模板模式的方式构建的,其内部除了提供并发操作核心方法以及同步队列操作外,还提供了一些模板方法让子类自己实现,如加锁操作以及解锁操作,为什么这么做?这是因为AQS作为基础组件,封装的是核心并发操作,但是实现上分为两种模式,即共享模式与独占模式,而这两种模式的加锁与解锁实现方式是不一样的,但AQS只关注内部公共方法实现并不关心外部不同模式的实现,所以提供了模板方法给子类使用,也就是说实现独占锁,如ReentrantLock需要自己实现tryAcquire()方法和tryRelease()方法,而实现共享模式的Semaphore,则需要实现tryAcquireShared()方法和tryReleaseShared()方法,这样做的好处是显而易见的,无论是共享模式还是独占模式,其基础的实现都是同一套组件(AQS),只不过是加锁解锁的逻辑不同罢了,更重要的是如果我们需要自定义锁的话,也变得非常简单,只需要选择不同的模式实现不同的加锁和解锁的模板方法即可,AQS提供给独占模式和共享模式的模板方法如下
//AQS中提供的主要模板方法,由子类实现。public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer{ //独占模式下获取锁的方法 protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); } //独占模式下解锁的方法 protected boolean tryRelease(int arg) { throw new UnsupportedOperationException(); } //共享模式下获取锁的方法 protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //共享模式下解锁的方法 protected boolean tryReleaseShared(int arg) { throw new UnsupportedOperationException(); } //判断是否为持有独占锁 protected boolean isHeldExclusively() { throw new UnsupportedOperationException(); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
在了解AQS的原理概要后,下面我们就基于ReetrantLock进一步分析AQS的实现过程,这也是ReetrantLock的内部实现原理。 基于ReetrantLock分析AQS独占模式实现过程ReetrantLock中非公平锁AQS同步器的实现依赖于内部的同步队列(FIFO的双向链表对列)完成对同步状态(state)的管理,当前线程获取锁(同步状态)失败时,AQS会将该线程以及相关等待信息包装成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会将头结点head中的线程唤醒,让其尝试获取同步状态。关于同步队列和Node结点,前面我们已进行了较为详细的分析,这里重点分析一下获取同步状态和释放同步状态以及如何加入队列的具体操作,这里从ReetrantLock入手分析AQS的具体实现,我们先以非公平锁为例进行分析。
//默认构造,创建非公平锁NonfairSyncpublic ReentrantLock() { sync = new NonfairSync();}//根据传入参数创建锁类型public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync();}//加锁操作public void lock() { sync.lock();}
1
2
3
4
5
6
7
8
9
10
11
12
13
前面说过sync是个抽象类,存在两个不同的实现子类,这里从非公平锁入手,看看其实现
/** * 非公平锁实现 */static final class NonfairSync extends Sync { //加锁 final void lock() { //执行CAS操作,获取同步状态 if (compareAndSetState(0, 1)) //成功则将独占锁线程设置为当前线程 setExclusiveOwnerThread(Thread.currentThread()); else //否则再次请求同步状态 acquire(1); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
这里获取锁时,首先对同步状态执行CAS操作,尝试把state的状态从0设置为1,如果返回true则代表获取同步状态成功,也就是当前线程获取锁成,可操作临界资源,如果返回false,则表示已有线程持有该同步状态(其值为1),获取锁失败,注意这里存在并发的情景,也就是可能同时存在多个线程设置state变量,因此是CAS操作保证了state变量操作的原子性。返回false后,执行 acquire(1)方法,该方法是AQS中的方法,它对中断不敏感,即使线程获取同步状态失败,进入同步队列,后续对该线程执行中断操作也不会从同步队列中移出,方法如下
public final void acquire(int arg) { //再次尝试获取同步状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
1
2
3
4
5
6
这里传入参数arg表示要获取同步状态后设置的值(即要设置state的值),因为要获取锁,而status为0时是释放锁,1则是获取锁,所以这里一般传递参数为1,进入方法后首先会执行tryAcquire(arg)方法,在前面分析过该方法在AQS中并没有具体实现,而是交由子类实现,因此该方法是由ReetrantLock类内部实现的
//NonfairSync类static final class NonfairSync extends Sync { protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }//Sync类abstract static class Sync extends AbstractQueuedSynchronizer { //nonfairTryAcquire方法 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判断同步状态是否为0,并尝试再次获取同步状态 if (c == 0) { //执行CAS操作 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果当前线程已获取锁,属于重入锁,再次获取锁后将status值加1 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); //设置当前同步状态,当前只有一个线程持有锁,因为不会发生线程安全问题,可以直接执行 setState(nextc); setState(nextc); return true; } return false; } //省略其他代码}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
从代码执行流程可以看出,这里做了两件事,一是尝试再次获取同步状态,如果获取成功则将当前线程设置为OwnerThread,否则失败,二是判断当前线程current是否为OwnerThread,如果是则属于重入锁,state自增1,并获取锁成功,返回true,反之失败,返回false,也就是tryAcquire(arg)执行失败,返回false。需要注意的是nonfairTryAcquire(int acquires)内部使用的是CAS原子性操作设置state值,可以保证state的更改是线程安全的,因此只要任意一个线程调用nonfairTryAcquire(int acquires)方法并设置成功即可获取锁,不管该线程是新到来的还是已在同步队列的线程,毕竟这是非公平锁,并不保证同步队列中的线程一定比新到来线程请求(可能是head结点刚释放同步状态然后新到来的线程恰好获取到同步状态)先获取到锁,这点跟后面还会讲到的公平锁不同。ok~,接着看之前的方法acquire(int arg)
public final void acquire(int arg) { //再次尝试获取同步状态 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}
shouldParkAfterFailedAcquire()方法的作用是判断当前结点的前驱结点是否为SIGNAL状态(即等待唤醒状态),如果是则返回true。如果结点的ws为CANCELLED状态(值为1>0),即结束状态,则说明该前驱结点已没有用应该从同步队列移除,执行while循环,直到寻找到非CANCELLED状态的结点。倘若前驱结点的ws值不为CANCELLED,也不为SIGNAL(当从Condition的条件等待队列转移到同步队列时,结点状态为CONDITION因此需要转换为SIGNAL),那么将其转换为SIGNAL状态,等待被唤醒。
若shouldParkAfterFailedAcquire()方法返回true,即前驱结点为SIGNAL状态同时又不是head结点,那么使用parkAndCheckInterrupt()方法挂起当前线程,称为WAITING状态,需要等待一个unpark()操作来唤醒它,到此ReetrantLock内部间接通过AQS的FIFO的同步队列就完成了lock()操作,这里我们总结成逻辑流程图
关于获取锁的操作,这里看看另外一种可中断的获取方式,即调用ReentrantLock类的lockInterruptibly()或者tryLock()方法,最终它们都间接调用到doAcquireInterruptibly()
private void doAcquireInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //直接抛异常,中断线程的同步状态请求 throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
最大的不同是
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //直接抛异常,中断线程的同步状态请求 throw new InterruptedException();
1
2
3
4
检测到线程的中断操作后,直接抛出异常,从而中断线程的同步状态请求,移除同步队列,ok~,加锁流程到此。下面接着看unlock()操作
//ReentrantLock类的unlockpublic void unlock() { sync.release(1);}//AQS类的release()方法public final boolean release(int arg) { //尝试释放锁 if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) //唤醒后继结点的线程 unparkSuccessor(h); return true; } return false;}//ReentrantLock类中的内部类Sync实现的tryRelease(int releases) protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; //判断状态是否为0,如果是则说明已释放同步状态 if (c == 0) { free = true; //设置Owner为null setExclusiveOwnerThread(null); } //设置更新同步状态 setState(c); return free; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
释放同步状态的操作相对简单些,tryRelease(int releases)方法是ReentrantLock类中内部类自己实现的,因为AQS对于释放锁并没有提供具体实现,必须由子类自己实现。释放同步状态后会使用unparkSuccessor(h)唤醒后继结点的线程,这里看看unparkSuccessor(h)
private void unparkSuccessor(Node node) { //这里,node一般为当前线程所在的结点。 int ws = node.waitStatus; if (ws < 0)//置零当前线程所在的结点状态,允许失败。 compareAndSetWaitStatus(node, ws, 0); Node s = node.next;//找到下一个需要唤醒的结点s if (s == null || s.waitStatus > 0) {//如果为空或已取消 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0)//从这里可以看出,<=0的结点,都是还有效的结点。 s = t; } if (s != null) LockSupport.unpark(s.thread);//唤醒}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
从代码执行操作来看,这里主要作用是用unpark()唤醒同步队列中最前边未放弃线程(也就是状态为CANCELLED的线程结点s)。此时,回忆前面分析进入自旋的函数acquireQueued(),s结点的线程被唤醒后,会进入acquireQueued()函数的if (p == head && tryAcquire(arg))的判断,如果p!=head也不会有影响,因为它会执行shouldParkAfterFailedAcquire(),由于s通过unparkSuccessor()操作后已是同步队列中最前边未放弃的线程结点,那么通过shouldParkAfterFailedAcquire()内部对结点状态的调整,s也必然会成为head的next结点,因此再次自旋时p==head就成立了,然后s把自己设置成head结点,表示自己已经获取到资源了,最终acquire()也返回了,这就是独占锁释放的过程。
ok~,关于独占模式的加锁和释放锁的过程到这就分析完,总之呢,在AQS同步器中维护着一个同步队列,当线程获取同步状态失败后,将会被封装成Node结点,加入到同步队列中并进行自旋操作,当当前线程结点的前驱结点为head时,将尝试获取同步状态,获取成功将自己设置为head结点。在释放同步状态时,则通过调用子类(ReetrantLock中的Sync内部类)的tryRelease(int releases)方法释放同步状态,释放成功则唤醒后继结点的线程。 ReetrantLock中公平锁了解完ReetrantLock中非公平锁的实现后,我们再来看看公平锁。与非公平锁不同的是,在获取锁的时,公平锁的获取顺序是完全遵循时间上的FIFO规则,也就是说先请求的线程一定会先获取锁,后来的线程肯定需要排队,这点与前面我们分析非公平锁的nonfairTryAcquire(int acquires)方法实现有锁不同,下面是公平锁中tryAcquire()方法的实现
//公平锁FairSync类中的实现protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //注意!!这里先判断同步队列是否存在结点 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
每个Condition都对应着一个等待队列,也就是说如果一个锁上创建了多个Condition对象,那么也就存在多个等待队列。等待队列是一个FIFO的队列,在队列中每一个节点都包含了一个线程的引用,而该线程就是Condition对象上等待的线程。当一个线程调用了await()相关的方法,那么该线程将会释放锁,并构建一个Node节点封装当前线程的相关信息加入到等待队列中进行等待,直到被唤醒、中断、超时才从队列中移出。Condition中的等待队列模型如下
正如图所示,Node节点的数据结构,在等待队列中使用的变量与同步队列是不同的,Condtion中等待队列的结点只有直接指向的后继结点并没有指明前驱结点,而且使用的变量是nextWaiter而不是next,这点我们在前面分析结点Node的数据结构时讲过。firstWaiter指向等待队列的头结点,lastWaiter指向等待队列的尾结点,等待队列中结点的状态只有两种即CANCELLED和CONDITION,前者表示线程已结束需要从等待队列中移除,后者表示条件结点等待被唤醒。再次强调每个Codition对象对于一个等待队列,也就是说AQS中只能存在一个同步队列,但可拥有多个等待队列。下面从代码层面看看被调用await()方法(其他await()实现原理类似)的线程是如何加入等待队列的,而又是如何从等待队列中被唤醒的
public final void await() throws InterruptedException { //判断线程是否被中断 if (Thread.interrupted()) throw new InterruptedException(); //创建新结点加入等待队列并返回 Node node = addConditionWaiter(); //释放当前线程锁即释放同步状态 int savedState = fullyRelease(node); int interruptMode = 0; //判断结点是否同步队列(SyncQueue)中,即是否被唤醒 while (!isOnSyncQueue(node)) { //挂起线程 LockSupport.park(this); //判断是否被中断唤醒,如果是退出循环。 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } //被唤醒后执行自旋操作争取获得锁,同时判断线程是否被中断 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; // clean up if cancelled if (node.nextWaiter != null) //清理等待队列中不为CONDITION状态的结点 unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
执行addConditionWaiter()添加到等待队列。
private Node addConditionWaiter() { Node t = lastWaiter; // 判断是否为结束状态的结点并移除 if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } //创建新结点状态为CONDITION Node node = new Node(Thread.currentThread(), Node.CONDITION); //加入等待队列 if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
await()方法主要做了3件事,一是调用addConditionWaiter()方法将当前线程封装成node结点加入等待队列,二是调用fullyRelease(node)方法释放同步状态并唤醒后继结点的线程。三是调用isOnSyncQueue(node)方法判断结点是否在同步队列中,注意是个while循环,如果同步队列中没有该结点就直接挂起该线程,需要明白的是如果线程被唤醒后就调用acquireQueued(node, savedState)执行自旋操作争取锁,即当前线程结点从等待队列转移到同步队列并开始努力获取锁。
接着看看唤醒操作singal()方法
public final void signal() { //判断是否持有独占锁,如果不是抛出异常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; //唤醒等待队列第一个结点的线程 if (first != null) doSignal(first); }