A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 不二晨 金牌黑马   /  2018-12-18 16:51  /  2050 人查看  /  2 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

源码使用的是1.9版本;sync 包里的WaitGroup主要用于线程的同步;计数主线程创建的子线程(WaitGoup.Add(i));调用清除标记方法(WaitGroup.Done());使用WaitGroup.Wait()来阻塞,直到所有子线程(标记=0)执行完毕。
例子:

package main

import (
    "sync"
    "fmt"
)

func main(){
    var swg sync.WaitGroup
    for i:=0;i<3;i++{
        //增加一个计数器
        swg.Add(1)
        go func(wg *sync.WaitGroup,mark int){
            //减去计数器
            defer wg.Done()//等价于 wg.Add(-1)
            fmt.Printf("%d goroutine finish \n",mark)
        }(&swg,i)
    }
    //等待所有go程结束
    swg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
结果:

2 goroutine finish
1 goroutine finish
0 goroutine finish
1
2
3
注意!如果将代码改成下面这样(子线程函数,传入的参数是waitgroup的值拷贝),会出现什么情况呢?

func main(){
    var swg sync.WaitGroup
    for i:=0;i<3;i++{
        swg.Add(1)
        go func(wg sync.WaitGroup,mark int){
            defer wg.Done()
            fmt.Printf("%d goroutine finish \n",mark)
        }(swg,i)
    }
    swg.Wait()
}
1
2
3
4
5
6
7
8
9
10
11
结果:

2 goroutine finish
fatal error: all goroutines are asleep - deadlock!
1 goroutine finish

0 goroutine finish
goroutine 1 [semacquire]:
sync.runtime_Semacquire(0xc0420080dc)
    C:/Go/src/runtime/sema.go:56 +0x40
sync.(*WaitGroup).Wait(0xc0420080d0)
    C:/Go/src/sync/waitgroup.go:131 +0x79
1
2
3
4
5
6
7
8
9
10
11
出现死锁,因为子协程传入的waitGroup对象是一份新值拷贝,主协程的waitGroup并没有调用Done()方法,导致标志位无法被释放;各位童鞋在使用的时候,记得传入waitGroup的引用拷贝。

WaitGroup源码分析(精简了无关主要逻辑的代码)
1、首先查看WaitGroup的数据结构:

type WaitGroup struct {
    noCopy noCopy
    //共12个字节,低4字节用于记录wait等待次数,高8字节是计数器(64位机器是高8字节,32机器是中间4个字节,因为64位机器的原子操作需要64位的对齐,但是32位的编译器不能确保。)
    state1 [12]byte
    //用于唤醒go程的信号量
    sema   uint32
}
1
2
3
4
5
6
7
2、WaitGroup.Add()方法

func (wg *WaitGroup) Add(delta int) {
    statep := wg.state()
    //将标记为加delta
    state := atomic.AddUint64(statep, uint64(delta)<<32)
    //获得计数器数值
    v := int32(state >> 32)
    //获得wait()等待次数
    w := uint32(state)
    //标记位不能小于0(done过多或者Add()负值太多)
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    //不能并发的Add() 和Done()
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    //Add 完毕
    if v > 0 || w == 0 {
        return
    }
    //执行到这,此时计数器V=0;那么等待计数器肯定和整个state的值相等,不然只有一个情况:有人调了Add(),并且是并发调用的。
    if *statep != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    //所有状态位清零
    *statep = 0
    //唤醒等待的go程
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false)
    }
}
//根据编译器位数,获得标志位和等待次数的数据域
func (wg *WaitGroup) state() *uint64 {
    if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 {
        return (*uint64)(unsafe.Pointer(&wg.state1))
    } else {
        return (*uint64)(unsafe.Pointer(&wg.state1[4]))
    }
}
// Done方法其实就是Add(-1)
func (wg *WaitGroup) Done() {
    wg.Add(-1)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
3、Wait方法

func (wg *WaitGroup) Wait() {
    statep := wg.state()
    //循环检查计数器V啥时候等于0
    for {
        state := atomic.LoadUint64(statep)
        v := int32(state >> 32)
        w := uint32(state)
        //v==0说明go程执行结束
        if v == 0 {
            return
        }
        //尚有未执行完的go程,等待标志位+1(直接在低位处理,无需移位)
        if atomic.CompareAndSwapUint64(statep, state, state+1) {
            runtime_Semacquire(&wg.sema)
            if *statep != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            return
        }
    }
}
---------------------
【转载】仅作分享,侵删
作者:番薯粉
原文:https://blog.csdn.net/u010230794/article/details/82380380


2 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马