显然我们通过默认构造函数创建时,诞生的就是非公平锁,
static final class NonfairSync extends Sync { NonfairSync(int permits) { super(permits); } //调用父类Sync的nonfairTryAcquireShared protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }}
1
2
3
4
5
6
7
8
9
显然传入的许可数permits传递给了父类,最终会传给AQS中的state变量,也就是同步状态的变量,如下
//AQS中控制同步状态的state变量public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer { private volatile int state; protected final int getState() { return state; } protected final void setState(int newState) { state = newState; } //对state变量进行CAS 操作 protected final boolean compareAndSetState(int expect, int update) { return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
从这点可知,Semaphore的初始化值也就是state的初始化值。当我们调用Semaphore的acquire()方法后,执行过程是这样的,当一个线程请求到来时,如果state值代表的许可数足够使用,那么请求线程将会获得同步状态即对共享资源的访问权,并更新state的值(一般是对state值减1),但如果state值代表的许可数已为0,则请求线程将无法获取同步状态,线程将被加入到同步队列并阻塞,直到其他线程释放同步状态(一般是对state值加1)才可能获取对共享资源的访问权。调用Semaphore的acquire()方法后将会调用到AQS的acquireSharedInterruptibly()如下
//Semaphore的acquire()public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }/*** 注意Sync类继承自AQS* AQS的acquireSharedInterruptibly()方法*/ public final void acquireSharedInterruptibly(int arg) throws InterruptedException { //判断是否中断请求 if (Thread.interrupted()) throw new InterruptedException(); //如果tryAcquireShared(arg)不小于0,则线程获取同步状态成功 if (tryAcquireShared(arg) < 0) //未获取成功加入同步队列等待 doAcquireSharedInterruptibly(arg);}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
从方法名就可以看出该方法是可以中断的,也就是说Semaphore的acquire()方法也是可中断的。在acquireSharedInterruptibly()方法内部先进行了线程中断的判断,如果没有中断,那么先尝试调用tryAcquireShared(arg)方法获取同步状态,如果获取成功,那么方法执行结束,如果获取失败调用doAcquireSharedInterruptibly(arg);方法加入同步队列等待。这里的tryAcquireShared(arg)是个模板方法,AQS内部没有提供具体实现,由子类实现,也就是有Semaphore内部自己实现,该方法在Semaphore内部非公平锁的实现如下
//Semaphore中非公平锁NonfairSync的tryAcquireShared()protected int tryAcquireShared(int acquires) { //调用了父类Sync中的实现方法 return nonfairTryAcquireShared(acquires);}//Syn类中abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { //使用死循环 for (;;) { int available = getState(); int remaining = available - acquires; //判断信号量是否已小于0或者CAS执行是否成功 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
nonfairTryAcquireShared(int acquires)方法内部,先获取state的值,并执行减法操作,得到remaining值,如果remaining不小于0,那么线程获取同步状态成功,可访问共享资源,并更新state的值,如果remaining大于0,那么线程获取同步状态失败,将被加入同步队列(通过doAcquireSharedInterruptibly(arg)),注意Semaphore的acquire()可能存在并发操作,因此nonfairTryAcquireShared()方法体内部采用无锁(CAS)并发的操作保证对state值修改的安全性。如何尝试获取同步状态失败,那么将会执行doAcquireSharedInterruptibly(int arg)方法
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { //创建共享模式的结点Node.SHARED,并加入同步队列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { //进入自旋操作 for (;;) { final Node p = node.predecessor(); //判断前驱结点是否为head if (p == head) { //尝试获取同步状态 int r = tryAcquireShared(arg); //如果r>0 说明获取同步状态成功 if (r >= 0) { //将当前线程结点设置为头结点并传播 setHeadAndPropagate(node, r); p.next = null; // help GC failed = false; return; } } //调整同步队列中node结点的状态并判断是否应该被挂起 //并判断是否需要被中断,如果中断直接抛出异常,当前结点请求也就结束 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) //结束该结点线程的请求 cancelAcquire(node); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
在方法中,由于当前线程没有获取同步状态,因此创建一个共享模式(Node.SHARED)的结点并通过addWaiter(Node.SHARED)加入同步队列,加入完成后,当前线程进入自旋状态,首先判断前驱结点是否为head,如果是,那么尝试获取同步状态并返回r值,如果r大于0,则说明获取同步状态成功,将当前线程设置为head并传播,传播指的是,同步状态剩余的许可数值不为0,通知后续结点继续获取同步状态,到此方法将会return结束,获取到同步状态的线程将会执行原定的任务。但如果前驱结点不为head或前驱结点为head并尝试获取同步状态失败,那么调用shouldParkAfterFailedAcquire(p, node)方法判断前驱结点的waitStatus值是否为SIGNAL并调整同步队列中的node结点状态,如果返回true,那么执行parkAndCheckInterrupt()方法,将当前线程挂起并返回是否中断线程的flag。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取当前结点的等待状态 int ws = pred.waitStatus; //如果为等待唤醒(SIGNAL)状态则返回true if (ws == Node.SIGNAL) return true; //如果ws>0 则说明是结束状态, //遍历前驱结点直到找到没有结束状态的结点 if (ws > 0) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果ws小于0又不是SIGNAL状态, //则将其设置为SIGNAL状态,代表该结点的线程正在等待唤醒。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }private final boolean parkAndCheckInterrupt() { //将当前线程挂起 LockSupport.park(this); //获取线程中断状态,interrupted()是判断当前中断状态, //并非中断线程,因此可能true也可能false,并返回 return Thread.interrupted();}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
到此,加入同步队列的整个过程完成。这里小结一下,在AQS中存在一个变量state,当我们创建Semaphore对象传入许可数值时,最终会赋值给state,state的数值代表同一个时刻可同时操作共享数据的线程数量,每当一个线程请求(如调用Semaphored的acquire()方法)获取同步状态成功,state的值将会减少1,直到state为0时,表示已没有可用的许可数,也就是对共享数据进行操作的线程数已达到最大值,其他后来线程将被阻塞,此时AQS内部会将线程封装成共享模式的Node结点,加入同步队列中等待并开启自旋操作。只有当持有对共享数据访问权限的线程执行完成任务并释放同步状态后,同步队列中的对于的结点线程才有可能获取同步状态并被唤醒执行同步操作,注意在同步队列中获取到同步状态的结点将被设置成head并清空相关线程数据(毕竟线程已在执行也就没有必要保存信息了),AQS通过这种方式便实现共享锁,简单模型如下
前面我们分析的是可中断的请求,与只对应的不可中的的请求(这些方法都存在于AQS,由子类Semaphore间接调用)如下
//不可中的acquireShared()public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);}private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) //没有抛出异常中的。。。。 interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node);//设置为头结点 /* * 尝试去唤醒队列中的下一个节点,如果满足如下条件: * 调用者明确表示"传递"(propagate > 0), * 或者h.waitStatus为PROPAGATE(被上一个操作设置) * 并且 * 下一个节点处于共享模式或者为null。 * * 这两项检查中的保守主义可能会导致不必要的唤醒,但只有在有 * 有在多个线程争取获得/释放同步状态时才会发生,所以大多 * 数情况下会立马获得需要的信号 */ if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态 doReleaseShared(); } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
显然与前面带中断请求doAcquireSharedInterruptibly(int arg)方法不同的是少线程中断的判断以及异常抛出,其他操作都一样,关于doReleaseShared(),放后面分析。ok~,了解完请求同步状态的过程,我们看看释放请求状态的过程,当每个线程执行完成任务将会释放同步状态,此时state值一般都会增加1。先从Semaphore的release()方法入手
//Semaphore的release()public void release() { sync.releaseShared(1);}//调用到AQS中的releaseShared(int arg) public final boolean releaseShared(int arg) { //调用子类Semaphore实现的tryReleaseShared方法尝试释放同步状态 if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
显然Semaphore间接调用了AQS中的releaseShared(int arg)方法,通过tryReleaseShared(arg)方法尝试释放同步状态,如果释放成功,那么将调用doReleaseShared()唤醒同步队列中后继结点的线程,tryReleaseShared(int releases)方法如下
//在Semaphore的内部类Sync中实现的protected final boolean tryReleaseShared(int releases) { for (;;) { //获取当前state int current = getState(); //释放状态state增加releases int next = current + releases; if (next < current) // overflow throw new Error("Maximum permit count exceeded"); //通过CAS更新state的值 if (compareAndSetState(current, next)) return true; } }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
逻辑很简单,释放同步状态,更新state的值,值得注意的是这里必须操作无锁操作,即for死循环和CAS操作来保证线程安全问题,因为可能存在多个线程同时释放同步状态的场景。释放成功后通过doReleaseShared()方法唤醒后继结点。
private void doReleaseShared() { /* * 保证释放动作(向同步等待队列尾部)传递,即使没有其他正在进行的 * 请求或释放动作。如果头节点的后继节点需要唤醒,那么执行唤醒 * 动作;如果不需要,将头结点的等待状态设置为PROPAGATE保证 * 唤醒传递。另外,为了防止过程中有新节点进入(队列),这里必 * 需做循环,所以,和其他unparkSuccessor方法使用方式不一样 * 的是,如果(头结点)等待状态设置失败,重新检测。 */ for (;;) { Node h = head; if (h != null && h != tail) { // 获取头节点对应的线程的状态 int ws = h.waitStatus; // 如果头节点对应的线程是SIGNAL状态,则意味着头 //结点的后继结点所对应的线程需要被unpark唤醒。 if (ws == Node.SIGNAL) { // 修改头结点对应的线程状态设置为0。失败的话,则继续循环。 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // 唤醒头结点h的后继结点所对应的线程 unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果头结点发生变化,则继续循环。否则,退出循环。 if (h == head) // loop if head changed break; }}//唤醒传入结点的后继结点对应的线程private void unparkSuccessor(Node node) { int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //拿到后继结点 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) //唤醒该线程 LockSupport.unpark(s.thread); }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
显然doReleaseShared()方法中通过调用unparkSuccessor(h)方法唤醒head的后继结点对应的线程。注意这里把head的状态设置为Node.PROPAGATE是为了保证唤醒传递,博主认为是可能同时存在多个线程并发争取资源,如果线程A已执行到doReleaseShared()方法中,正被唤醒后正准备替换head(实际上还没替换),而线程B又跑来请求资源,此时调用setHeadAndPropagate(Node node, int propagate)时,传入的propagate=0
if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) //唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态 doReleaseShared(); }
1
2
3
4
5
6
7
但为了保证持续唤醒后继结点的线程即doReleaseShared()方法被调用,可以把head的waitStatus设置为Node.PROPAGATE,这样就保证线程B也可以执行doReleaseShared()保证后续结点被唤醒或传播,注意doReleaseShared()可以同时被释放操作和获取操作调用,但目的都是为唤醒后继节点,因为是共享模式,所以允许多个线程同时获取同步状态。ok~,释放过程的分析到此完结,对于释放操作的过程还是相对简单些的,即尝试更新state值,更新成功调用doReleaseShared()方法唤醒后继结点对应的线程。 公平锁中的共享锁事实上公平锁的中的共享模式实现除了在获取同步状态时与非公平锁不同外,其他基本一样,看看公平锁的实现
static final class FairSync extends Sync { FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { //这里是重点,先判断队列中是否有结点再执行 //同步状态获取。 if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }