go的sync包中所有的结构都适用于goroutine并发执行的情况
sync.Once包
一个示例
1 | package main |
虽然起10个goroutine去调用onceBody,但是使用once.Do函数将onceBody包裹之后,onceBody只会执行一次.运行该代码,结果如下:
1 | Only once |
代码分析
设想一下原理
应该是once结构体中有个bool值决定是否已经执行过该函数.如果没有执行,首先加锁,然后执行函数,执行完毕之后修改bool值并且释放锁
sync.Once结构
1 | type Once struct { |
一个锁,一个done的uint32值
1 | func (o *Once) Do(f func()) { |
加锁,执行,最后置done标记.done为uint32估计是因为可以使用atomic包中的LoadUint32和StoreUint32原子性的加载和存储.
sync.Pool包
New字段定义如何生成对象,然后可以通过pool.Get()和pool.Put()获取和放回对象.一系列临时对象的对象池
关键结构体如下:
1 | type Pool struct { |
local可以理解为指向一个poolLocal数组,大小由localSize指定(localSize由cpu cores决定).
New指定一个函数,用来创建Pool中的对象
1 | type poolLocal struct { |
poolLocal中关键的字段为private和shared,private保存只能由相应的P(go调度器中概念,P/M/G)使用的对象.shared中保存可以由所有的P共同使用的对象
我们看一下Put的代码
1 | func (p *Pool) Put(x interface{}) { |
关键路径为首先将要放回的对象放入poolLocal中的private,否则放入共享的shared结构.
看一下Get()的代码路径
1 | func (p *Pool) Get() interface{} { |
首先从poolLocal的private中获取,如果未获取到则获取shared的最后一个元素,否则调用getSlow函数.
依然获取不到的话调用pool结构中的New函数生成.
1 | func (p *Pool) getSlow() (x interface{}) { |
getSlow函数会从当前P的下一个P开始依次遍历其他poolLocal中的shared结构去获取对象.
问题
- runtime_procPin()会返回当前P的pid,需要了解go scheduler相关知识加深理解
- pool中会注册一个runtime_registerPoolCleanup(poolCleanup),与Go GC相关,需了解go GC相关知识
sync.Cond包
条件变量,控制goroutine间的同步位点
通过NewCond生成一个Cond,调用cond.Wait()之后阻塞,其他goroutine调用cond.Signal()或者cond.Broadcast()唤醒wait的一个或者全部goroutine
一个示例
1 | c.L.Lock() |
一个goroutine中要执行Wait之前,首先将cond中的c.L.Lock()
关键结构体和方法
1 | type Cond struct { |
1 | func NewCond(l Locker) *Cond { |
调用NewCond需要传入一个实现了Locker接口的结构
1 | func (c *Cond) Wait() { |
Wait函数首先调用runtime将c.notify加入notifyList,然后解锁,然后等待.当条件变量满足后首先获取锁后Wait才会返回
1 |
|
Signal()函数唤醒一个在等待的goroutine
1 | func (c *Cond) Broadcast() { |
Broadcast唤醒全部等待的goroutine
问题
- 进一步的实现依赖于runtime,需要熟悉runtime