本帖最后由 wuqiong 于 2018-6-12 11:23 编辑
2-3. ForkJoinPool中的队列
那么ForkJoinPool是怎样创建队列的呢?请看如下两段源代码片段:
/**
* Tries to add the given task to a submission queue at
* submitter's current queue. Only the (vastly) most common path
* is directly handled in this method, while screening for need
* for externalSubmit.
*/
// ForkJoinPool类中的方法
// 该方法试图将一个任务提交到一个submission queue中,随机提交
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
// 取得一个随机探查数,可能为0也可能为其它数
int r = ThreadLocalRandom.getProbe();
// 获取当前ForkJoinPool的运行状态
int rs = runState;
// 最关键的操作在这里,详见后文说明
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null && (am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
// 以下三个原子操作首先是将task放入队列
U.putOrderedObject(a, j, task);
// 然后将“q”这个submission queue的top标记+1
U.putOrderedInt(q, QTOP, s + 1);
// 最后解除这个submission queue的锁定状态
U.putIntVolatile(q, QLOCK, 0);
// 如果条件成立,说明这时处于active的工作线程可能还不够
// 所以调用signalWork方法
if (n <= 1)
signalWork(ws, q);
return;
}
// 这里试图接除对这个submission queue的锁定状态
// 为什么会有两次接触呢?因为在之前代码中给队列加锁后,
// 可能队列的现有空间并不满足添加新的task的条件
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
......
/**
* Full version of externalPush, handling uncommon cases, as well
* as performing secondary initialization upon the first
* submission of the first task to the pool. It also detects
* first submission by an external thread and creates a new shared
* queue if the one at index if empty or contended.
*/
// 以下是externalSubmit方法的部分代码,用于初始化ForkJoinPool中的队列
private void externalSubmit(ForkJoinTask<?> task) {
......
// initialize
// 如果条件成立,就说明当前ForkJoinPool类中,还没有任何队列,所以要进行队列初始化
else if ((rs & STARTED) == 0 || ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
rs = lockRunState();
try {
if ((rs & STARTED) == 0) {
// 通过原子操作,完成“任务窃取次数”这个计数器的初始化
U.compareAndSwapObject(this, STEALCOUNTER, null, new AtomicLong());
// create workQueues array with size a power of two
// 这段代码也非常有趣,详见后文的分析。
int p = config & SMASK; // ensure at least 2 slots
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
......
}
externalPush方法中的“q = ws[m & r & SQMASK]”代码非常重要。我们大致来分析一下作者的意图,首先m是ForkJoinPool中的WorkQueue数组长度减1,例如当前WorkQueue数组大小为16,那么m的值就为15;r是一个线程独立的随机数生成器,关于java.util.concurrent.ThreadLocalRandom类的功能和使用方式可参见其它资料;而SQMASK是一个常量,值为126 (0x7e)。以下是一种可能的计算过程和计算结果:
实际上任何数和126进行“与”运算,其结果只可能是0或者偶数,即0 、 2 、 4 、 6 、 8。也就是说以上代码中从名为“ws”的WorkQueue数组中,取出的元素只可能是第0个或者第偶数个队列。 我们再来看看以上代码给出的externalSubmit方法中,进行WorkQueue数组初始化的代码。当外部调用这通过submit、execute、invoke方法向ForkJoinPool提交一个计算任务时,会运行这段代码为ForkJoinPool创建多个WorkQueue并形成数组。其中以下代码片段用于确定这个即将创建的WorkQueue数组的大小
...... // SMASK是一个常量 static final int SMASK = 0xffff; ...... // 这是config的来源 // mode是ForkJoinPool构造函数中设定的asyncMode,如果为LIFO,则mode为0,否则为65536 // parallelism 为技术人员设置的(或者程序自行设定的)并发等级 this.config = (parallelism & SMASK) | mode; ...... // ensure at least 2 slots int p = config & SMASK; // n这个变量就是要计算的WorkQueue数组的大小 int n = (p > 1) ? p - 1 : 1; ...... n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1; ......
从以上整理的代码可以看出,最后确认ForkJoinPool中WorkQueue数组初始化大小的因素是名叫config的变量,而config变量又与构造ForkJoinPool时所传入的并发等级(parallelism)、异步模式(asyncMode)有关。在我们选择LIFO模式时,计算结果如下表所示(下表中的结果建立在mode = 0的前提下):
[td]parallelism | n | parallelism | n | 1 | 4 | 2 | 4 | 3 | 8 | 4 | 8 | 5 | 16 | 6 | 16 | 7 | 16 | 8 | 16 | 9 | 32 | 10 | 32 | … | 32 | 14 | 32 | … | 32 | 16 | 32 | 17 | 64 | … | … |
是的,计算结果“n”按照两倍规模进行扩展,并且在初始化时保证和并发级别设定的数量(parallelism)至少两倍的关系。这是为什么呢?这是因为ForkJoinPool中的这些WorkQueue和工作线程ForkJoinWorkerThread并不是一对一的关系,而是随时都有多余ForkJoinWorkerThread数量的WorkQueue元素。而这个ForkJoinPool中的WorkQueue数组中,索引位为非奇数的工作队列用于存储从外部提交到ForkJoinPool中的任务,也就是所谓的submissions queue;索引位为偶数的工作队列用于存储归并计算过程中等待处理的子任务,也就是task queue。 这样我们也就可以明白,ForkJoinPool中重写的toString()方法,是如何取得submissions、tasks、steals和running监控数据,请看以下toString()方法的源码片段:
public String toString() { ...... WorkQueue[] ws; WorkQueue w; if ((ws = workQueues) != null) { // 循环着,依次遍历当前ForkJoinPool中WorkQueue数组的每一个元素 for (int i = 0; i < ws.length; ++i) { if ((w = ws) != null) { // 获取当前WorkQueue中元素的数量 // (WorkQueue也是使用数组方式存储这些元素) int size = w.queueSize(); // 如果当前数组元素的索引位为非奇数 // 说明是submissions queue,这时submissions计数器发生累加 if ((i & 1) == 0) qs += size; else { // 否则说明是task queue,这时tasks计数器增加 qt += size; // 通过nsteals属性,获得这个task queue中任务被“窃取”的次数 st += w.nsteals; // 如果条件成立,就说明当前task queue所对应的工作线程 // 没有被任何方式阻塞,所以running计数器增加 if (w.isApparentlyUnblocked()) ++rc; } } } } ...... return super.toString() + ...... ", running = " + rc + ", steals = " + st + ", tasks = " + qt + ", submissions = " + qs + "]"; }
注释比较详细,而且比起workqueue的入队出队逻辑和任务窃取逻辑,以上代码就是非常简单了。所以这里就不再赘述代码过程了。注意,以上代码分析基于JDK 1.8的源码分析,而与JDK1.7中ForkJoinPool的实现有较大差异。
2-4. 避免伪共享
所谓伪共享是指多核CPU在无锁情况下同时抢占同一缓存行的写操作权限所引起的频繁切换操作。是不是不好理解这句话?那么我们就采用图文方式来说明一下伪共享产生的场景: 上图是目前典型的计算机硬件层组织结构(只画了北桥部分,南桥部分不在我们讨论范围内,就忽略了),主存和CPU之间由北桥芯片负责数据交换,北桥芯片和CPU间的通讯速度被称为前端总线速度(和芯片类型、总线频率等因素有关),虽然基于目前的技术基础,前端总线速度都比较快,但总的来说还是算比较珍贵的计算资源。CPU内部结构主要有计算内核、一级缓存(L1)、二级缓存(L2)和三级缓存(L3),其中L1缓存为每个计算内核所独享的,L2缓存是否由每个计算内核所独享视不同的CPU型号不同而有所区别,L3缓存一般为CPU内核锁共享。L3、L2、L1缓存的内部读写速度和制造成本依次递增,这就意味着它们的容量依次递减。以AMD 锐龙5 1400 这款CPU为例,这是一款4核民用级CPU,L1 数据缓存(L1 Data)大小为 4 × 32KBytes、L2 缓存 4 × 64KBytes、L3 缓存2 × 4MBytes。注意L1级缓存除了数据缓存以外还有独立的指令缓存(L1 Inst),另外AMD 锐龙5 1400这款CPU的L3缓存为每两核共享一组,所以会出现2 × 4MBytes的情况。 为了避免CPU内核和相对缓慢的主存随时进行数据交互,CPU对在计算时对线程中数据的读写主要依靠这几级缓存进行,再通过前段总线将缓存中的数据和CPU数据进行同步。注意,真的只是相对而言的慢,DDR3/DDR4的内存速度还是很快的,DDR3-1600的读写速度可以达到 2.2 GB/s,而CPU L1缓存的数据读取速度可以达到 900 GB/s! CPU对于数据缓存的操作单位是缓存行,也就是说当进行CPU要对内存中的数据进行操作时,CPU会将内存地址和临近地址的数据全部装入缓存后,再进行操作(实际上现代计算机系统为了加快I/O性能,大量采用这样的操作原则,例如在“数据存储”专题讲到MySQL的InnoDB数据引擎从磁盘上读取数据到内存时,也是采用“Page”为单位进行的)。例如给出的CPU缓存行每一行都有64个字节(部分型号CPU的缓存行为32个字节),而一个长整型的大小为64位也就是8个字节,那么一个缓存行就可以容纳8个长整型(而实际上缓存行头部会使用8字节加载一些描述信息,所以实际上最多能存7个长整型)。 如果工作在两个不同内核的两个线程(或者多个线程),需要对同一内存地址的数据进行写操作该怎么办呢?这里就要提到一个协议——MESI协议(可以参看https://en.wikipedia.org/wiki/MESI_protocol,或者这本书《What Every Programmer Should Know About Memory》)。简单的来说,如果CPU内核要对缓存行的数据进行写操作时,首先根据这个缓存行的状态,分析该缓存行的数据是否可能也存在于其它CPU内核的缓存行中(缓存行状态为S),如果有则发送一个RFO请求到其它CPU,让其它CPU设定缓存行的状态为Invalid(无效状态),这样就可以保证只有本CPU能够对缓存行的数据进行写操作。 那么问题就很明显了,如果这些内核不停的相互请求对同一数据的写操作权限,就会出现资源抢占的情况,导致各个CPU内核L1和L2缓存全部失效,最后都不得不到L3甚至主存中重读数据。这都还算好比较好的情况,上文已经说过CPU对数据的操作以缓存行为单位,而一个缓存行可容纳多个数据(这里记为A数据和B数据),CPU内核1需要对数据A进行修改,同时CPU内核2需要对数据B进行修改。当这样的情况出现时,从技术人员角度来看好像并没有出现资源抢占,但实际上在多个CPU内核中发生的情况却发生了不必要的抢占。 这就是伪共享,这个问题在Java中当然有解决办法,但是相对比较“暴力”。原理就是让A数据和B数据处于不同的缓存行——缓存行在存储A数据后多出来的空间,采用一些无用的基本数据进行“补全”。这样B数据就可以处于不同缓存行了,如下代码所示:
...... // 这个是真实业务需要的属性,是一个长整型 public volatile long a = 0L; // 需要补6个长整型 public long a1, a2, a3, a4, a5, a6;
//================= // 值得一提的是,有的CPU缓存航为64字节,所以为了兼容性更好一点 // 你可以一直填充14个长整形 public long a1, a2, a3, a4, a5, a6; public volatile long a = 0L; public long a7, a8, a9, a10, a11, a12, a13, a14; ......
以上是Java代码的示例,是JDK1.7以及之前版本使用的一种解决方式。而JDK 1.8中有了一个更便利的解决方式,就是“@sun.misc.Contended”标记,而它的一个典型应用场景就在ForkJoinPool中:
.... @sun.misc.Contended public class ForkJoinPool extends AbstractExecutorService { .... } ....
2-5. ForkJoinPool工作监控
ForkJoinPool重写了toString()方法,以便技术人员在代码调试或者其它需要临时监控ForkJoinPool运行情况的场景下,轻松获取ForkJoinPool中的主要工作状态。以下运行效果展示了ForkJoinPool类的toString()方法打印的情况:
********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 224, submissions = 0] ********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 213, submissions = 0] ********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 14, tasks = 164, submissions = 0] ********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 9, running = 9, steals = 22, tasks = 281, submissions = 0] ********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 8, running = 8, steals = 22, tasks = 212, submissions = 0] ********************** @2503dbd3[Running, parallelism = 6, size = 9, active = 7, running = 7, steals = 22, tasks = 192, submissions = 0] **********************
以上的代码在对1亿条数据数进行排序时,有百万分之一的概率阻塞排序子任务。在子任务进行归并计算时,可强制让计算线程阻塞1秒。这时我们再执行这个应用程序并且进行监控,那么以下可能就是我们会看到的监控信息了:
********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 3, steals = 21, tasks = 154, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 0, steals = 21, tasks = 142, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 2, steals = 25, tasks = 131, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 7, running = 2, steals = 28, tasks = 212, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 7, running = 3, steals = 28, tasks = 216, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 6, running = 0, steals = 28, tasks = 148, submissions = 0] ********************** @2503dbd3[Running, parallelism = 4, size = 7, active = 5, running = 1, steals = 28, tasks = 142, submissions = 0] **********************
因为是随机概率,所以读者自行运行的监控效果和这里给出的监控效果是不同的。
======================== (接下文)
|