golang系列之-程序运行流程及GMP模型

本文仅介绍程序运行流程以及GMP如何寻找g并运行,其他如抢占、死锁、信号处理、profiling等内容不打算深入。当前go版本:1.24

前提

先讲几个概念

进程、线程、协程

  1. 进程:程序的一个实例,也是操作系统的一个task,操作系统的资源分配最小单位
  2. 线程:一种概念,操作系统调度的最小单位,一个进程可以包含多个线程,线程之间共享内存、文件描述符等资源。不同操作系统的实现并不一致,linux下进程与线程的结构都是task_structure,也就是说他们是一样的,不同线程之间用指针指向同一份资源如内存空间实现共享。
  3. 协程:也称为用户态线程,由应用程序自行实现/调度,一般情况下是协作式的,由开发者决定task何时让出cpu,go支持抢占式调度

线程与协程的映射模型

  1. 1:1模型,每个用户线程对应一个内核线程,如在c中pthread创建的线程,此时也可以理解为用户态线程就是内核态线程
  2. 1:N模型,一个内核线程对应多个用户线程,无法充分利用多核的并行性,现已淘汰
  3. M:N模型,多个用户线程对应多个内核线程,实现较复杂,使用者较少,go是其中一个

GMP模型

go早期的M:N模型遇到一些性能问题,如锁竞争激烈、线程创建/销毁频繁、CPU缓存失效等,为了解决这些问题引入了P。在GMP模型中

  1. G-goroutine,用户态线程
  2. M-machine,系统线程相关
  3. P-processor,缓存、调度上下文等,其数量一般与CPU核心数量一致。P的出现使得调度变得本地化,避免全局锁竞争,提升了CPU缓存命中率等,最终使得go的并发调度更加高效

go程序的运行流程

go程序启动时的入口是_rt0_amd64,该函数是汇编代码,具体如下

1
2
3
4
5
6
// src/runtime/asm_amd64.s
// 系统入口点
TEXT _rt0_amd64(SB),NOSPLIT,$-8
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)

runtime·rt0_go也是汇编代码,比较长,主要逻辑如下

  1. g0、m0双向绑定(g0、m0是全局变量,静态编译,因此指针已知,放在src/runtime/proc.go)
  2. runtime·args - 复制命令行参数(args函数放在src/runtime/runtime1.go)
  3. runtime·osinit - 系统初始化(osinit函数放在src/runtime/os_linux.go)
  4. runtime·schedinit - 调度器初始化(schedinit函数放在src/runtime/proc.go)
  5. runtime·mainPC - 纪录runtime·main的地址(main函数放在src/runtime/proc.go,其内部调用main_main,也就是用户自己编写的main函数)
  6. runtime·newproc - 创建G用于运行runtime·main,放到p的runq里,等待调度
  7. runtime·mstart - 运行runtime·mstart(汇编函数,实际调用的是runtime·mstart0,放在src/runtime/proc.go,内部进行栈初始化、信号注册等,最后运行调度函数schedule)

数据结构

G - goroutine

goroutine的数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// src/runtime/runtime2.go
// macOS下共440字节,虽然结构比较大,但GMP调度场景下使用的字段并不多
type g struct {
// 栈相关
stack stack // 最小为2KB,最大为1GB
stackguard0 uintptr // 用于正常函数调用时检查
stackguard1 uintptr // 用于调度恢复执行时检查

_panic *_panic // panic链
_defer *_defer // defer链
m *m // 当前m
sched gobuf // 上下文如pc、sp等信息
syscallsp uintptr // 系统调用时用
syscallpc uintptr // 系统调用时用
syscallbp uintptr // 系统调用时用
stktopsp uintptr // 栈顶sp

param unsafe.Pointer // 通用指针,存储sudog等
atomicstatus atomic.Uint32 // 状态,用的比较多,具体看后面列表
stackLock uint32 //
goid uint64 // 全局唯一id,从p获取,如果p没有,从sched获取
schedlink guintptr // next指针,下一个g,用于全局队列
waitsince int64 // 等待时刻
waitreason waitReason // 挂起原因

preempt bool // 抢占标志
preemptStop bool //
preemptShrink bool //

asyncSafePoint bool //

paniconfault bool //
gcscandone bool // 是否已经完成栈扫描
throwsplit bool // 是否允许栈分裂/扩容

activeStackChans bool //

parkingOnChan atomic.Bool //

inMarkAssist bool // 是否需要协助GC标记
coroexit bool //

raceignore int8 //
nocgocallback bool //
tracking bool // trackingSeq%8 == 0
trackingSeq uint8 // 计数器,初始为一个随机数
trackingStamp int64 // 开始时刻(计算_Gwaiting、_Grunnable阶段耗时)
runnableTime int64 // _Grunnable总耗时
lockedm muintptr // m,锁定时用
fipsIndicator uint8 //
sig uint32 //
writebuf []byte //
sigcode0 uintptr //
sigcode1 uintptr //
sigpc uintptr //
parentGoid uint64 // 父g的goid
gopc uintptr // 父g的pc
ancestors *[]ancestorInfo // 父g的指针放到一个列表
startpc uintptr // G的函数,=fn.fn
racectx uintptr // 冲突检测相关
waiting *sudog // sudog指针,channel、select用
cgoCtxt []uintptr //
labels unsafe.Pointer // g为user类型时,父子共享
timer *timer // 定时器,time.Sleep用
sleepWhen int64 // 定时器,超时时间
selectDone atomic.Uint32 //

goroutineProfiled goroutineProfileStateHolder // 新g需要标记不需要profile

coroarg *coro //
syncGroup *synctestGroup // 测试相关,g为user类型时,父子共享

trace gTraceState // trace相关数据

// GC Assist额度,或称为GC助攻积分,字节数。该值为负数时需要协助GC扫描,避免STW过长
gcAssistBytes int64
}

// g0,全局变量,只有第一个g0是编译期间生成的,其他g0在创建m绑定p时动态生成。不可被抢占
var g0 g

类型

user:大部分的g都是user类型,执行用户代码
sys :大部分runtime开头的函数等都是sys类型,除了少部分如runtime.main外

状态列表

status_name status_value description
_Gidle 0x00 G已创建但未初始化
_Grunnable 0x01 G放在runq,还未运行,未拥有stack
_Grunning 0x02 G正在运行用户代码,拥有stack,已绑定M、P,不在runq
_Gsyscall 0x03 G在执行系统调用(内核态),未运行用户代码,已绑定M,不在runq
_Gwaiting 0x04 G阻塞中,未运行用户代码,未拥有stack,不在runq
_Gmoribund_unused 0x05 此状态未被使用
_Gdead 0x06 墓碑,G被M剥离并存储于free链表,或从free链表获取绑定中
_Genqueue_unused 0x07 此状态未被使用
_Gcopystack 0x08 迁移stack中,未运行用户代码,不在runq
_Gpreempted 0x09 G被抢占,类似_Gwaiting
_Gscan 0x1000 GC在扫描stack,未运行用户代码,拥有stack,可以组合上述其他标记
_Gscanrunnable 0x1001 同_Gscan+_Grunnable
_Gscanrunning 0x1002 同_Gscan+_Grunning
_Gscansyscall 0x1003 同_Gscan+_Gsyscall
_Gscanwaiting 0x1004 同_Gscan+_Gwaiting
_Gscanpreempted 0x1009 同_Gscan+_Gpreempted

栈初始大小

普通g的栈大小初始为2KB,非固定大小,可扩容,最大1GB。当然,初始栈大小也不是一成不变的,系统在GC时计算平均栈大小并更新变量。g0的栈大小因系统而异,如下

OS 是否系统分配 (mStackIsSystemAllocated) g0 栈大小
Linux ❌(Go 运行时自己分配) 16KB
macOS ✅(系统分配) 2MB
iOS ✅(系统分配) 2MB
Windows ✅(系统分配) 1MB
Solaris ✅(系统分配) 8MB(可能)
OpenBSD 部分情况 ✅(mips64 除外) 未知,可能 8MB

M - machine

线程相关数据结构m如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
// macOS下共1912字节
type m struct {
g0 *g // g0,调度用
morebuf gobuf //
divmod uint32 //
_ uint32 //

procid uint64 // 线程ID,由系统提供
gsignal *g // g,信号处理用,32KB的栈
goSigStack gsignalStack //
sigmask sigset // 信号掩码
tls [tlsSlots]uintptr //
mstartfn func() // 线程启动时调用
curg *g // 当前g,user/sys类型
caughtsig guintptr //
p puintptr // 当前p
nextp puintptr // 获取的p先绑定在这个字段
oldp puintptr // m、p取消绑定时纪录
id int64 // 全局唯一ID,由mReserveID提供
mallocing int32 //
throwing throwType //
preemptoff string // 阻止抢占的标志,调试、特殊场景用
locks int32 // 计数器,阻止抢占
dying int32 //
profilehz int32 //
spinning bool // 自旋寻找工作中
blocked bool // 阻塞状态,跟park字段搭配使用
newSigstack bool //
printlock int8 //
incgo bool // 是否是c创建的
isextra bool //
isExtraInC bool //
isExtraInSig bool //
freeWait atomic.Uint32 // 销毁状态
needextram bool //
g0StackAccurate bool //
traceback uint8 //
ncgocall uint64 //
ncgo int32 //
cgoCallersUse atomic.Uint32 //
cgoCallers *cgoCallers //
park note // semaphore
alllink *m // next指针,用于allm链表
schedlink muintptr // next指针,用于空闲链表
lockedg guintptr // 被锁住的g
createstack [32]uintptr //
lockedExt uint32 //
lockedInt uint32 // 是否锁定m,cgo用
mWaitList mWaitList //

mLockProfile mLockProfile // profile相关
profStack []uintptr // profile相关

waitunlockf func(*g, unsafe.Pointer) bool // 解锁函数
waitlock unsafe.Pointer // 解锁函数参数
waitTraceSkip int // skip数量
waitTraceBlockReason traceBlockReason // 挂起原因-trace用

syscalltick uint32 //
freelink *m // next指针,freem链表
trace mTraceState // trace相关数据

libcall libcall //
libcallpc uintptr //
libcallsp uintptr //
libcallg guintptr //
winsyscall winlibcall //

vdsoSP uintptr //
vdsoPC uintptr //

preemptGen atomic.Uint32 // 计数器,统计抢占次数

signalPending atomic.Uint32 // 待执行抢占

pcvalueCache pcvalueCache //

dlogPerM //

mOS //

chacha8 chacha8rand.State // 随机数种子
cheaprand uint64 // 随机数

locksHeldLen int //
locksHeld [10]heldLockInfo //

// 对齐
_ [goexperiment.SpinbitMutexInt * 700 * (2 - goarch.PtrSize/4)]byte
}

// m0,全局变量。与g0不同,m0只有一个
var m0 m

抢占标志

m结构体有3个字段用于抢占判断,他们的作用和差异如下

字段 作用 如何影响抢占 主要用途
lockedInt 是否锁定 M lockedInt > 0 时,该 M 不能被调度出去 LockOSThread(),用于绑定线程(如 CGo 调用)
locks M 持有的锁计数 locks > 0 时,调度器不会抢占 Goroutine 运行时内部锁(GC 期间、关键代码)
preemptoff 阻止抢占的标志 preemptoff 非空时,M 不能被抢占 调试、特殊场景防止调度

P - processor

processor-局部化缓存、元数据等。数量一般与CPU核心数一致,除几个特殊的m,其他m都需要获取绑定一个p才能执行用户代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// macOS下共8952字节
type p struct {
id int32 // 数组索引,[0,gomaxprocs-1]
status uint32 // 状态
link puintptr // next指针
schedtick uint32 // 调度计数器
syscalltick uint32 // syscall计次数
sysmontick sysmontick // sysmon调度次数
m muintptr // 当前m
mcache *mcache // mcache内存分配器
pcache pageCache // 页缓存
raceprocctx uintptr //

deferpool []*_defer // defer列表,基于deferpoolbuf的切片
deferpoolbuf [32]*_defer // defer列表,固定数组

goidcache uint64 // 起始goid
goidcacheend uint64 // 结束goid,用完后向sched.goidgen一次性申请16个id

runqhead uint32 // 本地队列头部g索引
runqtail uint32 // 本地队列尾部g索引
runq [256]guintptr // 本地队列
runnext guintptr // 本地队列头部g,优化,可被p偷取

gFree struct { // 本地g空闲链表
gList // 链表头
n int32 // 数量
}

sudogcache []*sudog // sudog列表,基于sudogbuf的切片
sudogbuf [128]*sudog // sudog列表,固定数组

mspancache struct { // mspan缓存
len int // 数量
buf [128]*mspan // 指针
}

pinnerCache *pinner //

trace pTraceState // trace相关数据

palloc persistentAlloc //

gcAssistTime int64 //
gcFractionalMarkTime int64 // fractional模式下的标记耗时

limiterEvent limiterEvent //

gcMarkWorkerMode gcMarkWorkerMode // GC标记工作模式
gcMarkWorkerStartTime int64 // 当前worker的标记开始时刻

gcw gcWork // GC wbuf缓冲区(灰色队列)

wbBuf wbBuf // 写屏障缓冲区

runSafePointFn uint32 // GC标志是否需要执行safePointFn

statsSeq atomic.Uint32 // 计数器,奇数表示p正在写入stats

timers timers // 定时器列表,每次执行调度时判断是否有g超时

maxStackScanDelta int64 //

scannedStackSize uint64 //
scannedStacks uint64 //

preempt bool // 是否抢占

gcStopTime int64 // 停止时刻,检查到STW后纪录
}

状态列表

status_name status_value description
_Pidle 0 空闲,未运行用户代码,runq为空。默认状态
_Prunning 1 运行中,P被M绑定,正在运行用户代码
_Psyscall 2 正在执行系统调用,未运行用户代码,可被其他M偷取,类似_Pidle
_Pgcstop 3 被M(STW)停止并拥有,当前M挂起。初始状态
_Pdead 4 不在使用,剥离所有资源(GOMAXPROCS缩容)

schedt - 调度器

schedt-调度器,纪录全局调度资源等,访问需要加锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
// macOS下共6824字节
type schedt struct {
goidgen atomic.Uint64 // goid全局计数器
lastpoll atomic.Int64 // 纪录上一次执行netpoll的时刻,0-正在轮询
pollUntil atomic.Int64 // netpoll轮询截止时刻

lock mutex // 锁

midle muintptr // 空闲m链表
nmidle int32 // 空闲m数量
nmidlelocked int32 // 空闲的locked的m数量
mnext int64 // 累计m总量/自增id
maxmcount int32 // 最大可用线程数量,默认为10000
nmsys int32 // sys类型m数量
nmfreed int64 // 累计释放m数量

ngsys atomic.Int32 // 系统G的数量

pidle puintptr // 空闲p链表
npidle atomic.Int32 // 空闲p数量
nmspinning atomic.Int32 // 自旋的m的数量
needspinning atomic.Uint32 // 需要某个m自旋放弃p

runq gQueue // 状态为_Grunnable的g
runqsize int32 // 状态为_Grunnable的g数量

disable struct { //
user bool // 是否禁止user类型的g运行
runnable gQueue // user字段为false时,user类型的g都阻塞在这里
n int32 // 总数量
}

gFree struct { // 全局gFree链表
lock mutex // 锁
stack gList // 有栈的g
noStack gList // 无栈的g
n int32 // 总数量
}

sudoglock mutex // sudog锁,用于sudogcache
sudogcache *sudog // 全局sudog链表

deferlock mutex // 锁,保护deferpool
deferpool *_defer // defer队列

freem *m // free链表

gcwaiting atomic.Bool // STW,当前p释放到空闲队列
stopwait int32 // 待_Pgcstop的p数量
stopnote note // semaphore,存储执行GC的m
sysmonwait atomic.Bool // true-sysmon挂起休眠了
sysmonnote note // semaphore,sysmon挂起在这里

safePointFn func(*p) // 到达安全点时执行的函数
safePointWait int32 // 同p的数量,检查全部p是否都已执行safePointFn
safePointNote note // semaphore,存储执行GC的m

profilehz int32 // profiler

procresizetime int64 // 调整p数量的时刻
totaltime int64 // 调整p时,纪录所有核心的运行时长

sysmonlock mutex // 锁,用于sysmon

timeToRun timeHistogram // 累计所有处于_Grunnable状态的g的时长

idleTime atomic.Int64 //

totalMutexWaitTime atomic.Int64 // 累计所有处于_Gwaiting状态的g的时长

stwStoppingTimeGC timeHistogram //
stwStoppingTimeOther timeHistogram //

stwTotalTimeGC timeHistogram //
stwTotalTimeOther timeHistogram //

totalRuntimeLockWaitTime atomic.Int64 // 纪录总等待耗时
}

schedinit - 调度器初始化

有点长,还是直接看注释吧

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// src/runtime/proc.go
func schedinit() {
// 调度器锁初始化
lockInit(&sched.lock, lockRankSched)
lockInit(&sched.sysmonlock, lockRankSysmon)
lockInit(&sched.deferlock, lockRankDefer)
lockInit(&sched.sudoglock, lockRankSudog)
// 全局变量锁初始化
lockInit(&deadlock, lockRankDeadlock)
lockInit(&paniclk, lockRankPanic)
lockInit(&allglock, lockRankAllg)
lockInit(&allpLock, lockRankAllp)
lockInit(&reflectOffs.lock, lockRankReflectOffs)
lockInit(&finlock, lockRankFin)
lockInit(&cpuprof.lock, lockRankCpuprof)
// allocm函数内使用
allocmLock.init(lockRankAllocmR, lockRankAllocmRInternal, lockRankAllocmW)
// 执行系统调用exec、clone时使用
execLock.init(lockRankExecR, lockRankExecRInternal, lockRankExecW)
// trace锁初始化
traceLockInit()
// memstats锁初始化
lockInit(&memstats.heapStats.noPLock, lockRankLeafRank)

// 验证m数据结构的大小
lockVerifyMSize()

// g0(m为m0)
gp := getg()

// 限制能创建的线程数量为10000
sched.maxmcount = 10000
// 文件描述符,用于panic时输出到指定文件
crashFD.Store(^uintptr(0))

// 停止整个世界,空函数(staticlockranking默认为false)
worldStopped()

// ticks纪录当前时刻和CPU时钟
ticks.init()
// 插件、动态库元数据校验
moduledataverify()
// 栈相关,全局stackpool和stackLarge初始化
stackinit()
// malloc初始化
mallocinit()
// 解析GODEBUG合并到默认值
godebug := getGodebugEarly()
// 根据环境变量初始化CPU,要求在alginit前调用
cpuinit(godebug)
// 全局随机状态初始化,要求在alginit、mcommoninit前调用
randinit()
// 哈希计算相关初始化
alginit()
// 到这里就能正常使用maps、hash、rand了

// m通用初始化
mcommoninit(gp.m, -1)
// 插件、动态库初始化,到这里activeModules可用
modulesinit()
// 初始化类型信息,用于reflect和GC(依赖activeModules)
typelinksinit()
// 接口表初始化,使用activeModules(依赖activeModules)
itabsinit()
// 栈对象初始化,要求在GC开启前完成
stkobjinit()

// 保存当前线程信号掩码到m.sigmask(平台相关)
sigsave(&gp.m.sigmask)
initSigmask = gp.m.sigmask

// 复制命令行参数
goargs()
// 复制环境变量
goenvs()
// GOTRACEBACK相关环境变量解析(平台相关)
secure()
// 检查stdin、stdout、stderr是否有效(平台相关)
checkfds()
// debug、godebug解析设置
parsedebugvars()
// GC初始化
gcinit()

// 用于崩溃时纪录栈信息
// 16KB的栈
gcrash.stack = stackalloc(16384)
// stackguard0(用于正常函数调用时检查)
gcrash.stackguard0 = gcrash.stack.lo + 1000
// stackguard1(用于调度恢复执行时检查)
gcrash.stackguard1 = gcrash.stack.lo + 1000

// 默认为false,忽略
if disableMemoryProfiling {
// 如果禁用heap profiling
// parsedebugvars会覆盖MemProfileRate值,没关系,以disableMemoryProfiling设置为准
MemProfileRate = 0
}

// 内存剖析调用栈初始化
// mcommoninit在parsedebugvars前运行,需要重新初始化
mProfStackInit(gp.m)

// 调度器加锁
lock(&sched.lock)
// lastpoll纪录当前时刻
sched.lastpoll.Store(nanotime())
// osinit时已纪录到全局变量ncpu
procs := ncpu
// 以GOMAXPROCS为准
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
// 根据数量n扩容/缩容p
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
// 解锁
unlock(&sched.lock)

// 世界开始运行,空函数(staticlockranking默认为false)
worldStarted()

// 以下情况一般不应该发生

// go的版本号为空
if buildVersion == "" {
buildVersion = "unknown"
}

if len(modinfo) == 1 {
modinfo = ""
}
}

创建g负责运行main函数

newproc - 创建goroutine

调用newproc创建新的g用于运行runtime.main,不会立即执行,而是放入p的本地队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// 编译器将go关键字编译为newproc
func newproc(fn *funcval) {
gp := getg()
pc := sys.GetCallerPC()
// 切换到g0执行
systemstack(func() {
// 创建goroutine,此时状态已为_Grunnable或_Gwaiting
newg := newproc1(fn, gp, pc, false, waitReasonZero)

// p
pp := getg().m.p.ptr()
// 把g放到本地队列头部
runqput(pp, newg, true)

// runtime.main已运行
if mainStarted {
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}
})
}

// 创建goroutine
func newproc1(fn *funcval, callergp *g, callerpc uintptr, parked bool, waitreason waitReason) *g {
// 函数不能为nil
if fn == nil {
fatal("go of nil func value")
}

// m,禁止抢占
mp := acquirem()
// p
pp := mp.p.ptr()
// 从本地gFree获取一个g
newg := gfget(pp)
// 获取失败
if newg == nil {
// 创建g、分配栈(初始为2KB,GC时更新为平均值)
newg = malg(stackMin)
// 从_Gidle状态改为_Gdead,避免Gc扫描
casgstatus(newg, _Gidle, _Gdead)
// 把g放到allgs切片
allgadd(newg)
}
// 栈检查
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}

// 状态检查
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}

// totalSize=4*8+0=32
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize)
// 内存对齐,totalSize=32
totalSize = alignUp(totalSize, sys.StackAlign)
// sp=stack.hi-32
sp := newg.stack.hi - totalSize

// usesLR默认为false,忽略
if usesLR {
// 调用者LR设为0
*(*uintptr)(unsafe.Pointer(sp)) = 0
// 空函数,忽略
prepGoExitFrame(sp)
}

// arm64,先忽略
if GOARCH == "arm64" {
// 调用者FP设为0
*(*uintptr)(unsafe.Pointer(sp - goarch.PtrSize)) = 0
}

// sched重置清零
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))

// 纪录/更新g的相关信息

newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum
newg.sched.g = guintptr(unsafe.Pointer(newg))
// 调整fn、寄存器等信息
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
// 为nil(tracebackancestors默认为0)
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn

// 判断g是否是sys类型,一般runtime.*的都是,除了少部分如runtime.main等
if isSystemGoroutine(newg, false) {
// sys类型
// 计数器+=1
sched.ngsys.Add(1)
} else {
// user类型
// 测试相关,忽略
newg.syncGroup = callergp.syncGroup
// g
if mp.curg != nil {
newg.labels = mp.curg.labels
}
// PProf已启动
if goroutineProfile.active {
// goroutineProfiled=2
newg.goroutineProfiled.Store(goroutineProfileSatisfied)
}
}

// 判断g是否要进行跟踪统计
// trackingSeq=随机数
newg.trackingSeq = uint8(cheaprand())
// trackingSeq%8 == 0(1/8的概率?)
if newg.trackingSeq%gTrackingPeriod == 0 {
newg.tracking = true
}

// 累计到maxStackScan,有p的话,先暂存到p
gcController.addScannableStack(pp, int64(newg.stack.hi-newg.stack.lo))

// 默认_Grunnable,如果parked为true,则_Gwaiting
var status uint32 = _Grunnable
if parked {
status = _Gwaiting
// _Gwaiting时reason不能为空
newg.waitreason = waitreason
}
// goid,每个p一次性从sched申请16个
if pp.goidcache == pp.goidcacheend {
// 调度器goidgen+=16。如果goidgen为0,则goidcache=16
pp.goidcache = sched.goidgen.Add(_GoidCacheBatch)
// 如果此时goidcache为16,操作后值为1
pp.goidcache -= _GoidCacheBatch - 1
// 结束goid
pp.goidcacheend = pp.goidcache + _GoidCacheBatch
}
// goid
newg.goid = pp.goidcache
// 从_Gdead状态改为_Grunnable或_Gwaiting
casgstatus(newg, _Gdead, status)
// 当前p的起始goid向后挪动一位(被消耗一位)
pp.goidcache++
// trace相关纪录数据等清空
newg.trace.reset()

releasem(mp)

return newg
}

// 创建g、分配栈
func malg(stacksize int32) *g {
newg := new(g)
if stacksize >= 0 {
// =round2(0+stacksize)
stacksize = round2(stackSystem + stacksize)
// 创建栈
systemstack(func() {
// 创建栈,暂不讨论
newg.stack = stackalloc(uint32(stacksize))
})
// stackguard0(用于正常函数调用时检查)=stack.lo+928 (Ps. 2KB瞬间少了一半)
newg.stackguard0 = newg.stack.lo + stackGuard
// stackguard1(用于调度恢复执行时检查)= ^uintptr(0)
newg.stackguard1 = ^uintptr(0)
// lo指向的位置设置为0
*(*uintptr)(unsafe.Pointer(newg.stack.lo)) = 0
}
return newg
}

runtime.main - 程序入口

main函数有两个,一个是跟底层运行时相关的runtime.main,另一个是用户编写的main.main。runtime.main内部负责运行时相关初始化,如执行所有包的init函数、开启GC、cgo初始化等,最后调用main.main

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
// src/runtime/proc.go
func main() {
// 当前g绑定的m
mp := getg().m

// 冲突检测相关,忽略
mp.g0.racectx = 0

// 64位系统,栈最大为1GB
if goarch.PtrSize == 8 {
maxstacksize = 1000000000
} else {
maxstacksize = 250000000
}

// 2GB
maxstackceiling = 2 * maxstacksize

// 全局变量,用以标志允许newproc创建运行M
mainStarted = true

// 除了wasm,其他平台默认开启sysmon
if haveSysmon {
// 汇编,切换到g0运行
systemstack(func() {
// 清理freem链表,创建并初始化m,locked或cgo类型的m由模板线程延迟创建,其他类型则立即调用平台相关函数创建线程
// sysmon负责轮询netpoll、抢占超时的g、回收阻塞在syscall的p
newm(sysmon, nil, -1)
})
}

// 双向绑定,防抢占
// 当前g、m纪录到m.lockedg、g.lockedm字段
lockOSThread()

// 期望当前m就是m0
if mp != &m0 {
throw("runtime.main not on m0")
}

// 当前时刻-单调时钟
runtimeInitTime = nanotime()
if runtimeInitTime == 0 {
throw("nanotime returning zero")
}

// 开启trace
if debug.inittrace != 0 {
inittrace.id = getg().goid
inittrace.active = true
}

// 执行runtime相关的init函数,必须放在defer之前
doInit(runtime_inittasks)

// 下面会设置为false,如果期间有任何panic,defer内unlock
needUnlock := true
defer func() {
if needUnlock {
// 重置m.lockedg、g.lockedm字段
unlockOSThread()
}
}()

// 运行sweeper、scavenger(GC)
gcenable()

// cgo使用,判断main是否完成相关初始化操作
main_init_done = make(chan bool)

// cgo初始化
if iscgo {
if _cgo_pthread_key_created == nil {
throw("_cgo_pthread_key_created missing")
}

if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
if GOOS != "windows" {
if _cgo_setenv == nil {
throw("_cgo_setenv missing")
}
if _cgo_unsetenv == nil {
throw("_cgo_unsetenv missing")
}
}
if _cgo_notify_runtime_init_done == nil {
throw("_cgo_notify_runtime_init_done missing")
}

if set_crosscall2 == nil {
throw("set_crosscall2 missing")
}
set_crosscall2()

startTemplateThread()
cgocall(_cgo_notify_runtime_init_done, nil)
}

// 执行模块相关的init函数
for m := &firstmoduledata; m != nil; m = m.next {
doInit(m.inittasks)
}

// 关闭trace
inittrace.active = false

// cgo完成初始化
close(main_init_done)

// 到这里没有panic,就不需要用defer来unlock了
needUnlock = false
// 重置m.lockedg、g.lockedm字段
unlockOSThread()

// 有main函数但不需要执行
// 动态库(c-archive/c-shared)
if isarchive || islibrary {
// wasm只有一个M
if GOARCH == "wasm" {
// 到这里阻塞,不应该返回
pause(sys.GetCallerSP() - 16)
panic("unreachable")
}
return
}

// main.main函数,需要linker在运行时确定地址
fn := main_main
// 执行main.main函数
fn()

// 如果有G发生panic,打印trace然后退出
// runningPanicDefers会在panic+1,recovery时-1
if runningPanicDefers.Load() != 0 {
// 防止运行时间过长
for c := 0; c < 1000; c++ {
if runningPanicDefers.Load() == 0 {
break
}
// 同协程yield关键字,当前g让出CPU,g0执行调度运行其他g,非抢占
Gosched()
}
}

// 遇到无法recover的panic时,panicking不为0
if panicking.Load() != 0 {
// 当前g让出CPU,g0执行调度运行其他g
gopark(nil, nil, waitReasonPanicWait, traceBlockForever, 1)
}
// panic了,运行hooks
runExitHooks(0)

// 汇编代码runtime·exit
exit(0)
for {
var x *int32
*x = 0
}
}

runtime·mstart - 运行调度函数

runtime·mstart为汇编函数,函数内调用runtime·mstart0

mstart0

mstart0主要进行栈初始化、信号注册等,最后运行调度函数schedule

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
// src/runtime/proc.go
// g0分配栈,m0进行信号处理初始化、执行mstartfn,寻找并运行可运行的g(该函数永不返回)
func mstart0() {
gp := getg()

// 是否未分配,可能是由操作系统直接分配的栈
osStack := gp.stack.lo == 0
if osStack {
size := gp.stack.hi
// 如果没有栈,设置大小为16KB
if size == 0 {
// 16384*1 => 16KB
size = 16384 * sys.StackGuardMultiplier
}
// 栈顶
gp.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
// 栈底(预留1KB?)
gp.stack.lo = gp.stack.hi - size + 1024
}
// stackguard0(用于正常函数调用时检查)=stack.lo+928
gp.stackguard0 = gp.stack.lo + stackGuard
// stackguard1(用于调度恢复执行时检查)=stack.lo+928
gp.stackguard1 = gp.stackguard0

// 信号处理初始化、执行mstartfn,寻找并运行可运行的g(该函数永不返回)
mstart1()
// 这里执行后不会返回

// 如果栈由系统分配(linux为false,由go分配)
if mStackIsSystemAllocated() {
osStack = true
}

// m线程退出
mexit(osStack)
}

// 信号处理初始化、执行mstartfn,寻找并运行可运行的g(该函数永不返回)
func mstart1() {
gp := getg()

// 期望当前g是g0
if gp != gp.m.g0 {
throw("bad runtime·mstart")
}

// 寄存器相关
gp.sched.g = guintptr(unsafe.Pointer(gp))
gp.sched.pc = sys.GetCallerPC()
gp.sched.sp = sys.GetCallerSP()

// 汇编代码,amd64为空函数
asminit()
// 信号处理初始化(平台相关)、纪录procid
minit()

// 如果是m0
if gp.m == &m0 {
// 信号处理初始化(平台相关),如果是cgo,生成一定数量的extraM,
mstartm0()
}

// dataindependenttiming默认为0,忽略
if debug.dataindependenttiming == 1 {
sys.EnableDIT()
}

if fn := gp.m.mstartfn; fn != nil {
// 执行mstartfn
// 主要有sysmon/templateThread/用户自定义这几类函数
fn()
}

// 非m0(从代码看,只有m0才会运行该函数才对)
if gp.m != &m0 {
// 绑定m和p,并清理p.mcache
acquirep(gp.m.nextp.ptr())
// 重置nextp
gp.m.nextp = 0
}

// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

schedule - 调度函数

  1. 寻找g
    1. 如果lockedg有数据
      • 让出p并挂起休眠,直到lockedg状态变为可运行,被唤醒后绑定一个p返回
      • lockedg调整状态等数据,和m双向绑定,最后切换到lockedg的上下文执行(该函数永不返回)
    2. 执行findRunnable寻找可运行的g
    3. GC开始到标记结束这个过程只允许sys类型的g运行
      • 如果检查是user类型的g,回到开头重新findRunnable寻找
  2. p获取、与m绑定
    1. 如果当前m自旋中
      • 重置m.spinning,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
    2. 如果g是GCworker或tracereader
      • 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
    3. 如果g.lockedm有数据
      • m让出p给lockedm并唤醒,把当前m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
      • 回到开头重新findRunnable寻找
  3. g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
// 寻找并运行可运行的g(该函数永不返回)
func schedule() {
mp := getg().m

// m已加锁
if mp.locks != 0 {
throw("schedule: holding locks")
}

// m.lockedg有数据
if mp.lockedg != 0 {
// m让出p并挂起休眠,直到lockedg状态变为可运行,被唤醒后绑定一个p返回
stoplockedm()
// 被唤醒

// lockedg调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
execute(mp.lockedg.ptr(), false)
}

// cgo
if mp.incgo {
throw("schedule: in cgo")
}

top:
// p
pp := mp.p.ptr()
// 非抢占
pp.preempt = false

// 自旋中(runq应该为空) and (runnext有数据(这是访问优化) or runq不为空)
// 需要在checkTimers之前判断,因为其会把g放到runq
if mp.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}

// 寻找可运行的g
gp, inheritTime, tryWakeP := findRunnable()

// dontfreezetheworld默认为0。debug时为1,此时若panic则不会打断所有g运行
if debug.dontfreezetheworld > 0 && freezing.Load() {
lock(&deadlock)
lock(&deadlock)
}

// m自旋中(runq应该为空)
if mp.spinning {
// 重置m.spinning,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
resetspinning()
}

// GC时只有sys类型的g允许运行
if sched.disable.user && !schedEnabled(gp) {
// 调度器加锁
lock(&sched.lock)
// double-check
// GC开始到标记结束这个过程只允许sys类型的g运行
if schedEnabled(gp) {
// 解锁
unlock(&sched.lock)
} else {
// GC运行中
// 放进disable.runnable队列
sched.disable.runnable.pushBack(gp)
// 计数器+1
sched.disable.n++
// 解锁
unlock(&sched.lock)
// 重试
goto top
}
}

// 只有在g是GCworker或tracereader时才为true
if tryWakeP {
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}

// g.lockedm有数据
if gp.lockedm != 0 {
// 让出p给lockedm并唤醒,把当前m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
startlockedm(gp)
// 重试
goto top
}

// g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
execute(gp, inheritTime)
}

// g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
func execute(gp *g, inheritTime bool) {
mp := getg().m

// pprof相关
if goroutineProfile.active {
// 记录Goroutine运行的采样数据
tryRecordGoroutineProfile(gp, nil, osyield)
}

// g、m双向绑定
mp.curg = gp
gp.m = mp
// 从_Grunnable状态改为_Grunning
casgstatus(gp, _Grunnable, _Grunning)
// 重置waitsince
gp.waitsince = 0
// 非抢占
gp.preempt = false
// stack.lo+928
gp.stackguard0 = gp.stack.lo + stackGuard

// 是否继承当前时间切片
if !inheritTime {
// 不继承,schedtick+=1
mp.p.ptr().schedtick++
}

// m相关
hz := sched.profilehz
if mp.profilehz != hz {
// 平台相关
//
setThreadCPUProfiler(hz)
}

// 汇编,从 gp.sched 加载调度上下文,并执行
gogo(&gp.sched)
}

findRunnable - 寻找可运行的g

逻辑比较复杂,大概如下

  1. 如果遇到GC启动-STW,m释放p挂起休眠,被唤醒后重试
  2. 读取本地队列前尝试其他数据源
    1. 在GC标志停止阶段,执行安全点函数
      • 如果所有的p都执行了安全点函数,则唤醒GC线程
    2. 定时器p.timers检查
      • 清理最小堆,把所有标记删除的timer都移除出最小堆,如果最小的timer到期,则执行回调函数f运行
    3. 如果trace reader有g,返回该g
    4. 如果GC正在运行,从gcBgMarkWorkerPool获取一个g(dedicated/fractional类型),符合条件则返回
    5. 每执行61次schedtick-调度,从全局队列获取一个g返回
    6. 符合条件则运行finalizer、cgo,不讨论
  3. 本地队列不为空时,从本地队列拿一个g返回
  4. 全局队列不为空时,则拿走1/gomaxprocs数量的g,返回第一个g
  5. 本地队列、全局队列都没有数据
    1. 符合条件则执行netpoll轮询(非阻塞),把过期的g放入本地/全局队列,返回第一个g
    2. m自旋从其他p偷取g,成功返回第一个g,失败则回到开头重试
    3. 如果GC正在运行,从gcBgMarkWorkerPool获取一个g(idle类型),符合条件则返回
    4. wasm相关,不讨论
  6. 偷也没偷到,实在没事做,把p放回空闲队列,调度器加锁,double-check
    1. STW或GC标记停止阶段,返回开头重试
    2. 全局队列不为空,拿走1/gomaxprocs数量的g,返回第一个g
    3. 检查到调度器通知一个m进入自旋,当前m选择自旋,返回开头重试
    4. 取消p与m的绑定,把p放回空闲队列
      • 如果m在自旋,取消自旋
        • 再次检查全局队列,如果全局队列不为空,拿走1/gomaxprocs数量的g,返回第一个g
        • 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
        • 尝试从空闲链表拿走一个p,成功则绑定m和p,返回开头重试
        • 如果GC已启动且有空闲的标记g,获取p和g,绑定m和p,返回g
      • 符合条件则执行netpoll轮询(阻塞)
        • 从空闲链表拿一个p,把过期的g放入本地/全局队列,有p则返回第一个g,无则回到开头重试
      • 如果netpoll已经被轮询,检查到有更早的过期时刻,则中断轮询
    5. 把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回开头重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
// 寻找可运行的g
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
mp := getg().m

top:
pp := mp.p.ptr()
// STW,当前p释放到空闲队列
if sched.gcwaiting.Load() {
// 更新p、调度器状态,把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
gcstopm()
// 被唤醒

// 重试
goto top
}

// GC标记停止阶段需要执行safePointFn
if pp.runSafePointFn != 0 {
// 执行safePointFn,如果是最后一个p,则唤醒safePointNote
runSafePointFn()
}

// 定时器p.timers检查
// 清理最小堆,把所有标记删除的timer都移除出最小堆,如果最小的timer到期,则执行回调函数f运行
now, pollUntil, _ := pp.timers.check(0)

// trace相关
if traceEnabled() || traceShuttingDown() {
gp := traceReader()
if gp != nil {
trace := traceAcquire()
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.ok() {
trace.GoUnpark(gp, 0)
traceRelease(trace)
}
return gp, false, true
}
}

// GC已启动(gcStart时为true)
if gcBlackenEnabled != 0 {
// 从gcBgMarkWorkerPool获取一个g,符合条件则返回
gp, tnow := gcController.findRunnableGCWorker(pp, now)
// 拿到了
if gp != nil {
return gp, false, true
}
// 没拿到
now = tnow
}

// 从全局队列拿g
// 每61个schedtick and 全局队列不为空
if pp.schedtick%61 == 0 && sched.runqsize > 0 {
// 调度器加锁
lock(&sched.lock)
// 从全局队列获取一批p放到本地队列,返回第一个g
gp := globrunqget(pp, 1)
// 解锁
unlock(&sched.lock)
// 拿到了
if gp != nil {
return gp, false, false
}
// 没拿到
}

// finalizer
if fingStatus.Load()&(fingWait|fingWake) == fingWait|fingWake {
// 尝试从fing拿走g
if gp := wakefing(); gp != nil {
// 将g放到p.runq队列头部,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
ready(gp, 0, true)
}
}
// cgo,忽略
if *cgo_yield != nil {
// 汇编,执行cgo调用
asmcgocall(*cgo_yield, nil)
}

// 本地队列拿一个G
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}

// 到这里,本地队列为空

// 全局队列不为空
if sched.runqsize != 0 {
// 调度器加锁
lock(&sched.lock)
// 从全局队列获取一批p放到本地队列,返回第一个g
gp := globrunqget(pp, 0)
// 解锁
unlock(&sched.lock)
// 拿到了
if gp != nil {
return gp, false, false
}
// 没拿到
}

// 到这里,全局队列为空

// netpoll相关
// netpoll已初始化 and 挂起的g数量不为0 and 当前没有进行netpoll轮询
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
// 平台相关
// 执行epollWait检查,0-没有数据立即返回
if list, delta := netpoll(0); !list.empty() {
// G列表里拿一个
gp := list.pop()
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)

// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

return gp, false, false
}
}

// 到这里,netpoll也没有数据

// 满足条件的话,m自旋然后从其他p偷取g
// m自旋中 or 双倍自旋m的数量没有超过忙碌中的p数量
if mp.spinning || 2*sched.nmspinning.Load() < gomaxprocs-sched.npidle.Load() {
// m未自旋
if !mp.spinning {
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()
}

// 尝试从所有其他p偷取g
gp, inheritTime, tnow, w, newWork := stealWork(now)
// 偷取成功
if gp != nil {
return gp, inheritTime, false
}

// 偷取失败

// 可能有新的timer或GC需要运行
if newWork {
// 重试
goto top
}

// 更新当前时刻
now = tnow
// w也是pollUntil
// pollUntil = min(pollUntil, w)
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
}

// 到这里,没有找到G,没事做

// GC已启动(gcStart时为true) and 还有标记任务可以执行 and idleMarkWorkers计数器加1成功
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) && gcController.addIdleMarkWorker() {
// 从gcBgMarkWorkerPool获取一个g
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
// 有数据
if node != nil {
// gcMarkWorkerMode=3
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode
// g
gp := node.gp.ptr()

// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

return gp, false, false
}
// 恢复状态,idleMarkWorkers计数器减1
gcController.removeIdleMarkWorker()
}

// beforeIdle只在wasm有返回数据
gp, otherReady := beforeIdle(now, pollUntil)
if gp != nil {
// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

return gp, false, false
}
if otherReady {
// 重试
goto top
}

// 到这里,真的没任何事情做,把P放到idle

// 获取快照allp、idlepMask、timerpMask
allpSnapshot := allp // 所有p
idlepMaskSnapshot := idlepMask // 标记哪些P是空闲的
timerpMaskSnapshot := timerpMask // 标记哪些P有timer

// 调度器加锁
lock(&sched.lock)
// 以下有一部分是加锁后的double-check

// STW,当前p释放到空闲队列 or GC标记停止阶段需要执行safePointFn
if sched.gcwaiting.Load() || pp.runSafePointFn != 0 {
unlock(&sched.lock)
goto top
}
// 全局队列不为空
if sched.runqsize != 0 {
// 从全局队列获取一批p放到本地队列,返回第一个g
gp := globrunqget(pp, 0)
unlock(&sched.lock)
return gp, false, false
}
// m未自旋 and 调度器通知让其中一个m让出p
if !mp.spinning && sched.needspinning.Load() == 1 {
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()
unlock(&sched.lock)
goto top
}

// 取消p与m的绑定
if releasep() != pp {
throw("findrunnable: wrong p")
}
// 把p放进空闲链表
now = pidleput(pp, now)
unlock(&sched.lock)

// 纪录,用于恢复状态
wasSpinning := mp.spinning

// m自旋中
if mp.spinning {
// 重置spinning
mp.spinning = false
// 复原,nmspinning-=1
if sched.nmspinning.Add(-1) < 0 {
// 小于0,异常
throw("findrunnable: negative nmspinning")
}

lock(&sched.lock)
// 全局队列不为空
if sched.runqsize != 0 {
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ := pidlegetSpinning(0)
// 拿到p!
if pp != nil {
// 从全局队列获取一批p放到本地队列,返回第一个g
gp := globrunqget(pp, 0)
// 状态异常
if gp == nil {
throw("global runq empty with non-zero runqsize")
}
unlock(&sched.lock)
// 绑定m和p,并清理p.mcache
acquirep(pp)
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()
return gp, false, false
}
}
unlock(&sched.lock)

// 全局队列为空 or 没有拿到p

// 只要有一个p是忙碌的,从剩余空闲p中拿走一个(可能拿不到)
pp := checkRunqsNoP(allpSnapshot, idlepMaskSnapshot)
if pp != nil {
// 绑定m和p,并清理p.mcache
acquirep(pp)
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()
goto top
}

// 如果GC已启动且有空闲的标记g,获取p和g
pp, gp := checkIdleGCNoP()
if pp != nil {
// 绑定m和p,并清理p.mcache
acquirep(pp)
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()

// gcMarkWorkerMode=3
pp.gcMarkWorkerMode = gcMarkWorkerIdleMode

// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

return gp, false, false
}

// 最后
// 找到所有p中的最小pollUntil
pollUntil = checkTimersNoP(allpSnapshot, timerpMaskSnapshot, pollUntil)
}

// netpoll相关
// netpoll已初始化 and (挂起的g数量不为0 or 过期时刻不为0) and 当前没有执行netpoll轮询
if netpollinited() && (netpollAnyWaiters() || pollUntil != 0) && sched.lastpoll.Swap(0) != 0 {
// 纪录pollUntil
sched.pollUntil.Store(pollUntil)
// netpoll不能有p
if mp.p != 0 {
throw("findrunnable: netpoll with p")
}
// 自旋在上面已经判断过了
if mp.spinning {
throw("findrunnable: netpoll with spinning")
}
// 阻塞时长
// -1为永久阻塞
delay := int64(-1)
// 过期时刻不为0
if pollUntil != 0 {
// 当前时刻
if now == 0 {
now = nanotime()
}
// 过期时刻到当前时刻的时长/时差
delay = pollUntil - now
// 已过期
if delay < 0 {
// 0为非阻塞
delay = 0
}
}
// faketime,忽略
if faketime != 0 {
delay = 0
}

// netpoll轮询,根据delay判断是否要阻塞,获得G列表及数量
list, delta := netpoll(delay)
// 重新读取当前时刻
now = nanotime()
// 已经从netpoll回来,pollUntil重置为0
sched.pollUntil.Store(0)
// 纪录上一次执行netpoll的时刻
sched.lastpoll.Store(now)

// faketime,忽略
if faketime != 0 && list.empty() {
stopm()
goto top
}

// 调度器加锁
lock(&sched.lock)
// 从空闲链表拿一个p
pp, _ := pidleget(now)
// 解锁
unlock(&sched.lock)
// 没拿到p
if pp == nil {
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
} else {
// 绑定m和p,并清理p.mcache
acquirep(pp)
// 有数据
if !list.empty() {
// G列表里拿一个
gp := list.pop()
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)

// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

return gp, false, false
}
// 先前为自旋状态
if wasSpinning {
// 状态、计数器更新
// spinning=true nmspinning+=1 needspinning=0
mp.becomeSpinning()
}
goto top
}
} else if pollUntil != 0 && netpollinited() { // 说明正在执行netpoll轮询
pollerPollUntil := sched.pollUntil.Load()
// 轮询完毕 or 有更早过期的事件
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
// 中断netpoll轮询
netpollBreak()
}
}
// 把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
stopm()
// 被唤醒

// 重试
goto top
}

到这里基本结束,下面是一些依赖的函数

相关依赖函数

goroutine

创建

newproc、malg,不在重复

终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
func Goexit() {
var p _panic
p.goexit = true

// 执行defer
p.start(sys.GetCallerPC(), unsafe.Pointer(sys.GetCallerSP()))
for {
fn, ok := p.nextDefer()
if !ok {
break
}
fn()
}

goexit1()
}

// 将g放回本地gFree,重新寻找并运行可运行的g
func goexit1() {
// 当前g切换到g0,运行goexit0函数
// 将g放回本地gFree,重新寻找并运行可运行的g(永不返回)
mcall(goexit0)
}

// 将g放回本地gFree,重新寻找并运行可运行的g
func goexit0(gp *g) {
// g重置状态、取消与m的双向绑定后放到本地gFree,如果有locked的m,则切换到g0调度执行
gdestroy(gp)
// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

// g重置状态、取消与m的双向绑定后放到本地gFree,如果有locked的m,则切换到g0调度执行
func gdestroy(gp *g) {
// m
mp := getg().m
// p
pp := mp.p.ptr()

// 从_Grunning状态改为_Gdead
casgstatus(gp, _Grunning, _Gdead)

// 累计到maxStackScan,有p的话,先暂存到p
gcController.addScannableStack(pp, -int64(gp.stack.hi-gp.stack.lo))
// 判断g是否是sys类型,一般runtime.*的都是,除了少部分如runtime.main等
if isSystemGoroutine(gp, false) {
// ngsys-=1
sched.ngsys.Add(-1)
}
// g取消m绑定
gp.m = nil
// 是否有lockedm
locked := gp.lockedm != 0
// locked字段重置
gp.lockedm = 0
mp.lockedg = 0
//
gp.preemptStop = false
//
gp.paniconfault = false
//
gp._defer = nil
gp._panic = nil
gp.writebuf = nil
gp.waitreason = waitReasonZero
gp.param = nil
gp.labels = nil
// 定时器
gp.timer = nil
gp.syncGroup = nil

// GC已启动(gcStart时为true) and 额度还有剩余
if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 {
// 把g的额度累计到全局额度bgScanCredit
// 转换参数
assistWorkPerByte := gcController.assistWorkPerByte.Load()
// 字节转换成额度
scanCredit := int64(assistWorkPerByte * float64(gp.gcAssistBytes))
// 累计到全局额度bgScanCredit
gcController.bgScanCredit.Add(scanCredit)
// 重置为0
gp.gcAssistBytes = 0
}

// g、m解除绑定
dropg()

// wasm
if GOARCH == "wasm" {
// 把g放到本地gFree
gfput(pp, gp)
return
}

// 有locked的m and lockedInt计数器不为0
if locked && mp.lockedInt != 0 {
print("runtime: mp.lockedInt = ", mp.lockedInt, "\n")
if mp.isextra {
throw("runtime.Goexit called in a thread that was not created by the Go runtime")
}
throw("exited a goroutine internally locked to the OS thread")
}

// 把g放到本地gFree
gfput(pp, gp)

// 有locked的m
if locked {
// 非plan9,返回mstart释放p并退出线程
if GOOS != "plan9" {
// 汇编,从 g0.sched 加载调度上下文,并执行
gogo(&mp.g0.sched)
} else {
// plan9,清零
mp.lockedExt = 0
}
}
}

gFree - 空闲链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// 把g放到本地gFree,如果gFree数量超过64,只保留32个g,剩下的放到全局gFree
func gfput(pp *p, gp *g) {
// 获取g.atomicstatus状态,判断是否是_Gdead
if readgstatus(gp) != _Gdead {
throw("gfput: bad status (not Gdead)")
}

// 栈大小
stksize := gp.stack.hi - gp.stack.lo

// 超过2KB
if stksize != uintptr(startingStackSize) {
// 释放栈
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
}

// 放到本地gFree
pp.gFree.push(gp)
// 计数器+=1
pp.gFree.n++
// 如果本地gFree数量超过64
if pp.gFree.n >= 64 {
var (
inc int32
stackQ gQueue
noStackQ gQueue
)
// 保留最多32个数据
for pp.gFree.n >= 32 {
// g
gp := pp.gFree.pop()
// 计数器-=1
pp.gFree.n--

if gp.stack.lo == 0 {
// 未分配,无栈
noStackQ.push(gp)
} else {
// 有栈
stackQ.push(gp)
}
// 计数器+=1
inc++
}
// 全局gFree加锁
lock(&sched.gFree.lock)
// 无栈
sched.gFree.noStack.pushAll(noStackQ)
// 有栈
sched.gFree.stack.pushAll(stackQ)
sched.gFree.n += inc
// 解锁
unlock(&sched.gFree.lock)
}
}

// 从本地gFree获取一个g,如果本地gFree为空,从全局gFree一次性拿最多32个到本地
func gfget(pp *p) *g {
retry:
// 本地gFree列表为空 and 全局gFree列表不为空
if pp.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
// 全局gFree加锁
lock(&sched.gFree.lock)
// 填充32个数据
for pp.gFree.n < 32 {
// 优先获取有栈的g
gp := sched.gFree.stack.pop()
if gp == nil {
// 无栈的g
gp = sched.gFree.noStack.pop()
if gp == nil {
// 都没有数据
break
}
}
// 计数器-=1
sched.gFree.n--
// 放到本地gFree
pp.gFree.push(gp)
// 计数器+=1
pp.gFree.n++
}
// 解锁
unlock(&sched.gFree.lock)
// 重试
goto retry
}

// 获取一个g
gp := pp.gFree.pop()
// 本地跟全局gFree都没有数据
if gp == nil {
return nil
}
// 计数器-=1
pp.gFree.n--

// g有栈 and 栈大小超过2KB
if gp.stack.lo != 0 && gp.stack.hi-gp.stack.lo != uintptr(startingStackSize) {
systemstack(func() {
// 释放栈
stackfree(gp.stack)
gp.stack.lo = 0
gp.stack.hi = 0
gp.stackguard0 = 0
})
}

// g无栈
if gp.stack.lo == 0 {
systemstack(func() {
// 申请2KB的栈
gp.stack = stackalloc(startingStackSize)
})
// stackguard0(用于正常函数调用时检查)= stack.lo+928
gp.stackguard0 = gp.stack.lo + stackGuard
}
return gp
}

// 把本地gFree数据迁移到全局gFree里
func gfpurge(pp *p) {
var (
inc int32
stackQ gQueue
noStackQ gQueue
)
// 清空gFree
for !pp.gFree.empty() {
gp := pp.gFree.pop()
pp.gFree.n--
if gp.stack.lo == 0 {
// 无栈
noStackQ.push(gp)
} else {
// 有栈
stackQ.push(gp)
}
// 计数器
inc++
}
// 全局gFree加锁
lock(&sched.gFree.lock)
// 无栈
sched.gFree.noStack.pushAll(noStackQ)
// 有栈
sched.gFree.stack.pushAll(stackQ)
sched.gFree.n += inc
// 解锁
unlock(&sched.gFree.lock)
}

user/sys类型判断

1
2
3
4
5
6
7
8
9
10
11
12
// GC开始到标记结束这个过程只允许sys类型的g运行
func schedEnabled(gp *g) bool {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// GC时禁止user类型g
if sched.disable.user {
// 判断g是否是sys类型,一般runtime.*的都是,除了少部分如runtime.main等
return isSystemGoroutine(gp, true)
}
return true
}

status状态管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
// 移除_Gscan标志位,如果旧状态不符合要求则抛出异常
func casfrom_Gscanstatus(gp *g, oldval, newval uint32) {
success := false

// 旧值
switch oldval {
default:
// 默认抛出异常
print("runtime: casfrom_Gscanstatus bad oldval gp=", gp, ", oldval=", hex(oldval), ", newval=", hex(newval), "\n")
dumpgstatus(gp)
throw("casfrom_Gscanstatus:top gp->status is not in scan state")
case _Gscanrunnable,
_Gscanwaiting,
_Gscanrunning,
_Gscansyscall,
_Gscanpreempted:
// 加上_Gscan标志标志位
if newval == oldval&^_Gscan {
success = gp.atomicstatus.CompareAndSwap(oldval, newval)
}
}
// 失败,抛出异常
if !success {
print("runtime: casfrom_Gscanstatus failed gp=", gp, ", oldval=", hex(oldval), ", newval=", hex(newval), "\n")
dumpgstatus(gp)
throw("casfrom_Gscanstatus: gp->status is not in scan state")
}
// 同releasem(staticlockranking默认为false)
releaseLockRankAndM(lockRankGscan)
}

// 非_Gscan状态转换为带_Gscan状态
func castogscanstatus(gp *g, oldval, newval uint32) bool {
switch oldval {
case _Grunnable,
_Grunning,
_Gwaiting,
_Gsyscall:
if newval == oldval|_Gscan {
r := gp.atomicstatus.CompareAndSwap(oldval, newval)
if r {
// 同acquirem(staticlockranking默认为false)
acquireLockRankAndM(lockRankGscan)
}
return r

}
}
print("runtime: castogscanstatus oldval=", hex(oldval), " newval=", hex(newval), "\n")
throw("castogscanstatus")
panic("not reached")
}

// 非_Gscan状态转换,统计g位于_Grunnable、_Gwaiting状态时所耗费的时间
func casgstatus(gp *g, oldval, newval uint32) {
// 有_Gscan标志位 or 新旧数值相同
if (oldval&_Gscan != 0) || (newval&_Gscan != 0) || oldval == newval {
systemstack(func() {
// Call on the systemstack to prevent print and throw from counting
// against the nosplit stack reservation.
print("runtime: casgstatus: oldval=", hex(oldval), " newval=", hex(newval), "\n")
throw("casgstatus: bad incoming values")
})
}

// 空函数(staticlockranking默认为false)
lockWithRankMayAcquire(nil, lockRankGscan)

// 延迟时间
const yieldDelay = 5 * 1000
var nextYield int64

// 不断尝试,直到GC完成
for i := 0; !gp.atomicstatus.CompareAndSwap(oldval, newval); i++ {
// 从_Gwaiting状态改为_Grunnable
if oldval == _Gwaiting && gp.atomicstatus.Load() == _Grunnable {
// 异常
systemstack(func() {
throw("casgstatus: waiting for Gwaiting but is Grunnable")
})
}
// 第一次尝试
if i == 0 {
// 延迟5us
nextYield = nanotime() + yieldDelay
}
// 还没到指定时刻
if nanotime() < nextYield {
// 等待atomicstatus复原
for x := 0; x < 10 && gp.atomicstatus.Load() != oldval; x++ {
// 自旋,短暂的忙等待(纳秒级)
procyield(1)
}
} else {
// 已到指定时刻
// 主动让出CPU,让OS选择其他线程运行(微秒级到毫秒级)
osyield()
// 延迟2.5us
nextYield = nanotime() + yieldDelay/2
}
}

// 完成状态修改

// 测试,忽略
if gp.syncGroup != nil {
systemstack(func() {
gp.syncGroup.changegstatus(gp, oldval, newval)
})
}

// 原状态为_Grunning
if oldval == _Grunning {
// casgstatusAlwaysTrack默认为false or trackingSeq/8 == 0
if casgstatusAlwaysTrack || gp.trackingSeq%gTrackingPeriod == 0 {
gp.tracking = true
}
// 计数器(初始值随机)+=1
gp.trackingSeq++
}

if !gp.tracking {
// tracking为false
return
}

// tracking为true
// 统计g位与_Grunnable、_Gwaiting状态时所耗费的时间

// 统计运行时长、阻塞(mutex)时长
switch oldval {
case _Grunnable: // 原状态为_Grunnable
now := nanotime()
// 计算g处于_Grunnable状态的时长
gp.runnableTime += now - gp.trackingStamp
// 重置
gp.trackingStamp = 0
case _Gwaiting: // 原状态为_Gwaiting
// 如果不是mutex导致的等待,不统计
if !gp.waitreason.isMutexWait() {
break
}
now := nanotime()
// 累计所有处于_Gwaiting状态的g的时长
sched.totalMutexWaitTime.Add((now - gp.trackingStamp) * gTrackingPeriod)
// 重置
gp.trackingStamp = 0
}

switch newval {
case _Gwaiting: // 新状态为_Gwaiting
// 如果不是mutex导致的等待,不统计
if !gp.waitreason.isMutexWait() {
break
}
now := nanotime()
// trackingStamp纪录当前时刻
gp.trackingStamp = now
case _Grunnable: // 新状态为_Grunnable
now := nanotime()
// trackingStamp纪录当前时刻
gp.trackingStamp = now
case _Grunning: // 新状态为_Grunning
// 取消tracking跟踪统计
gp.tracking = false
// 累计所有处于_Grunnable状态的g的时长
sched.timeToRun.record(gp.runnableTime)
// 重置
gp.runnableTime = 0
}
}

// 改为_Gwaiting状态并设置waitreason
func casGToWaiting(gp *g, old uint32, reason waitReason) {
gp.waitreason = reason
// 非_Gscan状态转换,统计g位于_Grunnable、_Gwaiting状态时所耗费的时间
casgstatus(gp, old, _Gwaiting)
}

// 改为_Gwaiting状态并设置waitreason
func casGToWaitingForGC(gp *g, old uint32, reason waitReason) {
if !reason.isWaitingForGC() {
throw("casGToWaitingForGC with non-isWaitingForGC wait reason")
}
casGToWaiting(gp, old, reason)
}

// 从_Grunning状态改为_Gscanpreempted
func casGToPreemptScan(gp *g, old, new uint32) {
if old != _Grunning || new != _Gscan|_Gpreempted {
throw("bad g transition")
}

// 同acquirem(staticlockranking默认为false)
acquireLockRankAndM(lockRankGscan)
// 不停尝试切换状态
for !gp.atomicstatus.CompareAndSwap(_Grunning, _Gscan|_Gpreempted) {
}
}

// 从_Gpreempted状态改为_Gwaiting
func casGFromPreempted(gp *g, old, new uint32) bool {
if old != _Gpreempted || new != _Gwaiting {
throw("bad g transition")
}
gp.waitreason = waitReasonPreempted
if !gp.atomicstatus.CompareAndSwap(_Gpreempted, _Gwaiting) {
return false
}
if sg := gp.syncGroup; sg != nil {
sg.changegstatus(gp, _Gpreempted, _Gwaiting)
}
return true
}

allg-全局列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 把g放到allg列表
func allgadd(gp *g) {
// 获取g.atomicstatus状态,如果是默认状态
if readgstatus(gp) == _Gidle {
throw("allgadd: bad status Gidle")
}

// 加锁
lock(&allglock)
// 放到allgs末尾
allgs = append(allgs, gp)
// 指针指向了过期的g,更新
if &allgs[0] != allgptr {
atomicstorep(unsafe.Pointer(&allgptr), unsafe.Pointer(&allgs[0]))
}
// 更新allglen
atomic.Storeuintptr(&allglen, uintptr(len(allgs)))
// 解锁
unlock(&allglock)
}

// 返回allg
func allGsSnapshot() []*g {
// 空函数(staticlockranking默认为false)
assertWorldStoppedOrLockHeld(&allglock)

return allgs[:len(allgs):len(allgs)]
}

machine

结构体验证

1
2
3
4
5
6
7
8
9
10
11
// 验证m数据结构的大小
func lockVerifyMSize() {
// m大小为1912,最后的size应为2040+8=2048
size := roundupsize(unsafe.Sizeof(m{}), false) + mallocHeaderSize
// 2048&1023 => 0
if size&mutexMMask != 0 {
print("M structure uses sizeclass ", size, "/", hex(size), " bytes; ",
"incompatible with mutex flag mask ", hex(mutexMMask), "\n")
throw("runtime.m memory alignment too small for spinbit mutex")
}
}

创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
// 清理freem链表,创建并初始化m,locked或cgo类型的m由模板线程延迟创建,其他类型则立即调用平台相关函数创建线程
func newm(fn func(), pp *p, id int64) {
acquirem()

// 清理freem链表,创建并初始化m
mp := allocm(pp, fn, id)
// 将p存储到nextp
mp.nextp.set(pp)
// 信号掩码设置
mp.sigmask = initSigmask
// g、m不为nil and (有lockedm or cgo) and 非plan9
if gp := getg(); gp != nil && gp.m != nil && (gp.m.lockedExt != 0 || gp.m.incgo) && GOOS != "plan9" {
// 加锁
lock(&newmHandoff.lock)
// 未设置,异常
if newmHandoff.haveTemplateThread == 0 {
throw("on a locked thread with no template thread")
}
// next指针,指向newm,m成为链表的头部
mp.schedlink = newmHandoff.newm
// newm链表指针设置为m
newmHandoff.newm.set(mp)
// 如果templateThread正在休眠
if newmHandoff.waiting {
// 重置
newmHandoff.waiting = false
// 唤醒m(m放在wake.key)
notewakeup(&newmHandoff.wake)
}
// 解锁
unlock(&newmHandoff.lock)
releasem(getg().m)
return
}
// 调用平台相关函数创建线程
newm1(mp)
releasem(getg().m)
}

// 调用平台相关函数创建线程
func newm1(mp *m) {
// cgo
if iscgo {
var ts cgothreadstart
if _cgo_thread_start == nil {
throw("_cgo_thread_start missing")
}
ts.g.set(mp.g0)
ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
ts.fn = unsafe.Pointer(abi.FuncPCABI0(mstart))
// Prevent process clone
execLock.rlock()
asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
execLock.runlock()
return
}
execLock.rlock()
// 调用平台相关函数创建线程
newosproc(mp)
execLock.runlock()
}

// 调用平台相关函数创建线程
// src/runtime/os_linux.go
func newosproc(mp *m) {
// g0栈顶
stk := unsafe.Pointer(mp.g0.stack.hi)

if false {
print("newosproc stk=", stk, " m=", mp, " g=", mp.g0, " clone=", abi.FuncPCABI0(clone), " id=", mp.id, " ostk=", &mp, "\n")
}

// 旧的sigset
var oset sigset
// 用sigset_all设置,旧的放到oset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
ret := retryOnEAGAIN(func() int32 {
// 成功返回tid,失败返回errno
r := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(abi.FuncPCABI0(mstart)))
// 成功返回,不需要tid
if r >= 0 {
return 0
}
// 返回errno
return -r
})
// 还原
sigprocmask(_SIG_SETMASK, &oset, nil)

// 创建线程失败
if ret != 0 {
print("runtime: failed to create new OS thread (have ", mcount(), " already; errno=", ret, ")\n")
if ret == _EAGAIN {
println("runtime: may need to increase max user processes (ulimit -u)")
}
throw("newosproc")
}
}

// 清理freem链表,创建并初始化m
func allocm(pp *p, fn func(), id int64) *m {
allocmLock.rlock()

acquirem()

gp := getg()
// 当前m没有p
if gp.m.p == 0 {
// 绑定m和p,并清理p.mcache
acquirep(pp)
}

// 待释放的m链表不为空,只保留仍在使用的m
if sched.freem != nil {
// 调度器加锁
lock(&sched.lock)
// 仍在使用的m链表
var newList *m
// 遍历整个freem链表
for freem := sched.freem; freem != nil; {
// m.freeWait
wait := freem.freeWait.Load()

// 把所有状态为freeMWait的m连接成一个新链表
// m.freeWait == 2 => 仍在使用,退出前的初始状态,线程还没退出
if wait == freeMWait {
// 获取下一个freem
next := freem.freelink
// 修改next指针(指向prev)
freem.freelink = newList
// prev指针指向freem
newList = freem
// 指向下一个freem
freem = next
continue
}

// 下面是m.freeWait != 2 的节点

// m.freeWait == 0 => exitThread,栈可回收
if wait == freeMStack {
systemstack(func() {
// 释放g0的栈
stackfree(freem.g0.stack)
})
}

// m.freeWait == 1 => 线程退出最后一步

// 下一个freem
freem = freem.freelink
}
// 替换freem链表
sched.freem = newList
// 解锁
unlock(&sched.lock)
}

// 创建m
mp := new(m)
// 设置启动函数
mp.mstartfn = fn
// m通用初始化
mcommoninit(mp, id)

// 每个m分配一个g0
// cgo or 栈由系统分配(linux为false,由go分配)
if iscgo || mStackIsSystemAllocated() {
// 创建g、分配栈,无栈
mp.g0 = malg(-1)
} else {
// 创建g、分配栈,16KB的栈
mp.g0 = malg(16384 * sys.StackGuardMultiplier)
}
// 双向绑定
mp.g0.m = mp

// 跟m当前p是同一个
if pp == gp.m.p.ptr() {
// 取消p与m的绑定
releasep()
}

releasem(gp.m)
allocmLock.runlock()
return mp
}

通用初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// m通用初始化
func mcommoninit(mp *m, id int64) {
gp := getg()

// 如果g不是g0
if gp != gp.m.g0 {
// 获取当前调用栈信息,存入createstack数组
callers(1, mp.createstack[:])
}

// 调度器加锁
lock(&sched.lock)

// 纪录m.id
if id >= 0 {
mp.id = id
} else {
// 如果没有超过最大数量限制,则返回全局唯一id
mp.id = mReserveID()
}

// 初始化m的随机状态:m.chacha8、m.cheaprand
mrandinit(mp)

// m.gsignal初始化(32KB栈的g,平台相关)
mpreinit(mp)
// 初始化成功
if mp.gsignal != nil {
// stack.lo+928
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + stackGuard
}

// next指针,跟allm串联起来成为新的allm
mp.alllink = allm

// 更新allm指针
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
// 解锁
unlock(&sched.lock)

// cgo相关
if iscgo || GOOS == "solaris" || GOOS == "illumos" || GOOS == "windows" {
mp.cgoCallers = new(cgoCallers)
}
// 内存剖析调用栈初始化
mProfStackInit(mp)
}

信号处理初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// m0信号处理初始化(平台相关),如果是cgo,生成一定数量的extraM
func mstartm0() {
// cgoHasExtraM默认为false
// cgo或者是Windows
if (iscgo || GOOS == "windows") && !cgoHasExtraM {
// 设置为true
cgoHasExtraM = true
// 生成extraMWaiters数量的g、m并纪录在allg、extraM
newextram()
}
// 信号处理初始化(平台相关)
initsig(false)
}

// 信号处理初始化(平台相关)
func initsig(preinit bool) {
if !preinit {
// preinit为false时
signalsOK = true
}

// 动态库(c-archive/c-shared) and
if (isarchive || islibrary) && !preinit {
return
}

// 32个元素
for i := uint32(0); i < _NSIG; i++ {
t := &sigtable[i]
// 默认? or 状态为_SigDefault(=16,第5位)
if t.flags == 0 || t.flags&_SigDefault != 0 {
continue
}

// 没有其他G并行处理,不需要CAS
fwdSig[i] = getsig(i)

// 安装go级别的函数
if !sigInstallGoHandler(i) {
// 默认行为、忽略信号
if fwdSig[i] != _SIG_DFL && fwdSig[i] != _SIG_IGN {
// 注册备用信号栈
setsigstack(i)
} else if fwdSig[i] == _SIG_IGN {
// 初始化并检测哪些信号在进程启动时被忽略
sigInitIgnored(i)
}
continue
}

// 数组,32个元素
handlingSig[i] = 1
// 安装信号
setsig(i, abi.FuncPCABIInternal(sighandler))
}
}

自增ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 如果没有超过最大数量限制,则返回全局唯一id
func mReserveID() int64 {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 判断是否会溢出
if sched.mnext+1 < sched.mnext {
throw("runtime: thread ID overflow")
}
// id=当前mnext
id := sched.mnext
// mnext+=1
sched.mnext++
// 检查m的数量是否超过最大数量限制
checkmcount()
return id
}

数量限制检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 检查m的数量是否超过最大数量限制
func checkmcount() {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 累计总量 - 累计释放m数量 - 运行中的extraM数量 - 未运行的extraM数量
// sched.mnext - sched.nmfreed - extraMInUse - extraMLength
count := mcount() - int32(extraMInUse.Load()) - int32(extraMLength.Load())
// 超过最大数量限制
if count > sched.maxmcount {
print("runtime: program exceeds ", sched.maxmcount, "-thread limit\n")
throw("thread exhaustion")
}
}

midle-全局空闲队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// 把m放到midle空闲链表
func mput(mp *m) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 把m放到midle链表头部

// next指针指向midle
mp.schedlink = sched.midle
// mp替换midle
sched.midle.set(mp)
// 计数器+1
sched.nmidle++
// 检查是否存在死锁
checkdead()
}

// 从midle空闲链表中拿一个m
func mget() *m {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// midle链表头部
mp := sched.midle.ptr()
// 有空闲m
if mp != nil {
// schedlink即为next指针
sched.midle = mp.schedlink
// 计数器-1
sched.nmidle--
}
return mp
}

m启动/停止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
func startm(pp *p, spinning, lockheld bool) {
mp := acquirem()

// 需要加锁,返回时再根据lockheld同步锁状态
if !lockheld {
lock(&sched.lock)
}

// p为nil
if pp == nil {
// spinning为true时,p不能为nil
if spinning {
throw("startm: P required for spinning=true")
}

// spinning为false

// 从空闲链表拿一个p
pp, _ = pidleget(0)
// 没有拿到
if pp == nil {
// 返回前同步锁状态
if !lockheld {
unlock(&sched.lock)
}
releasem(mp)
return
}
}

// 到这里,p不可能为nil

// 从midle空闲链表中拿一个m
nmp := mget()

// 没有拿到m
if nmp == nil {
// 如果没有超过最大数量限制,则返回全局唯一id,避免死锁
id := mReserveID()
unlock(&sched.lock)

var fn func()

// 同步spinning状态
if spinning {
// 该函数将m.spinning置为true
fn = mspinning
}

// 清理freem链表,创建并初始化m,locked或cgo类型的m由模板线程延迟创建,其他类型则立即调用平台相关函数创建线程
newm(fn, pp, id)

// 返回前同步锁状态
if lockheld {
lock(&sched.lock)
}

releasem(mp)
return
}

// 拿到空闲的m了

// 返回前同步锁状态
if !lockheld {
unlock(&sched.lock)
}
// 空闲m不能有spinning状态
if nmp.spinning {
throw("startm: m is spinning")
}
// nextp不应该有数据
if nmp.nextp != 0 {
throw("startm: m has p")
}
// spinning意味着本地队列为空,这里数据不一致
if spinning && !runqempty(pp) {
throw("startm: p has runnable gs")
}
//同步spinning状态
nmp.spinning = spinning
// 不直接绑定p,而是绑定nextp
nmp.nextp.set(pp)
// 唤醒m(m放在m.park.key)
notewakeup(&nmp.park)

releasem(mp)
}

// 把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
func stopm() {
gp := getg()

// m已加锁
if gp.m.locks != 0 {
throw("stopm holding locks")
}
// m没有让出p
if gp.m.p != 0 {
throw("stopm holding p")
}
// 自旋中
if gp.m.spinning {
throw("stopm spinning")
}

// 调度器加锁
lock(&sched.lock)
// 把m放到midle空闲链表
mput(gp.m)
// 解锁
unlock(&sched.lock)

// m挂起休眠,等待唤醒
mPark()
// 被唤醒

// 绑定m和p,并清理p.mcache
acquirep(gp.m.nextp.ptr())
// 重置nextp
gp.m.nextp = 0
}

线程退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// m线程退出
func mexit(osStack bool) {
mp := getg().m

// m0
if mp == &m0 {
// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
handoffp(releasep())
// 调度器加锁
lock(&sched.lock)
// 累计释放m数量+=1
sched.nmfreed++
// 检查是否存在死锁
checkdead()
// 解锁
unlock(&sched.lock)
// m挂起休眠,等待唤醒
mPark()
// 被唤醒
throw("locked m0 woke up")
}

// 普通m(按道理,只有m0才会运行该函数才对)

// 阻塞所有信号(平台相关)
sigblock(true)
// 信号处理(平台相关)
unminit()

// gsignal
if mp.gsignal != nil {
// 释放栈
stackfree(mp.gsignal.stack)
// g被释放
mp.gsignal = nil
}

// 调度器加锁
lock(&sched.lock)
// 遍历allm
for pprev := &allm; *pprev != nil; pprev = &(*pprev).alllink {
// 找到当前m
if *pprev == mp {
// 链接下一个m
*pprev = mp.alllink
goto found
}
}
// 异常
throw("m not found in allm")
found:

// m.freeWait == 2 => 仍在使用,退出前的初始状态,线程还没退出
mp.freeWait.Store(freeMWait)
// m放到freem链表
mp.freelink = sched.freem
// 替换freem链表
sched.freem = mp
// 解锁
unlock(&sched.lock)

// m.ncgocall累加到ncgocall
atomic.Xadd64(&ncgocall, int64(mp.ncgocall))
// 纪录总等待耗时
sched.totalRuntimeLockWaitTime.Add(mp.mLockProfile.waitTime.Load())

// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
handoffp(releasep())

// 调度器加锁
lock(&sched.lock)
// 累计释放m数量+=1
sched.nmfreed++
// 检查是否存在死锁
checkdead()
// 解锁
unlock(&sched.lock)

// macOS/iOS
if GOOS == "darwin" || GOOS == "ios" {
if mp.signalPending.Load() != 0 {
pendingPreemptSignals.Add(-1)
}
}

// 释放资源(平台相关)
mdestroy(mp)

// 栈是否由系统分配
if osStack {
// m.freeWait == 1 => 标记线程退出最后一步
mp.freeWait.Store(freeMRef)

// 返回mstart,由系统释放g0栈并终止线程
return
}

// 直接退出(linux下为汇编函数runtime·exitThread)
exitThread(&mp.freeWait)
}

自旋等待

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 重置m.spinning,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func resetspinning() {
gp := getg()
// 状态不一致
if !gp.m.spinning {
throw("resetspinning: not a spinning m")
}
// 重置spinning
gp.m.spinning = false
// nmspinning-=1
nmspinning := sched.nmspinning.Add(-1)
// 数量小于0,异常
if nmspinning < 0 {
throw("findrunnable: negative nmspinning")
}

// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}

processor

创建/容量调整

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
// 根据数量n调整p容量
func procresize(nprocs int32) *p {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)
assertWorldStopped()

// 旧值
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}

// 当前时刻
now := nanotime()
// 上一次调整的时刻
if sched.procresizetime != 0 {
// 所有核心的运行时长
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
// 更新调整时刻
sched.procresizetime = now

// 32个p一组,每个p用一个bit表示状态
maskWords := (nprocs + 31) / 32

// 调整p
if nprocs > int32(len(allp)) {
// 加锁
lock(&allpLock)
// 缩容
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
// 扩容
// 创建新的allp
nallp := make([]*p, nprocs)
// 复制旧的p
copy(nallp, allp[:cap(allp)])
// 替换allp
allp = nallp
}

// 缩容/不变
if maskWords <= int32(cap(idlepMask)) {
idlepMask = idlepMask[:maskWords]
timerpMask = timerpMask[:maskWords]
} else {
// 扩容,状态位数量=maskWords*32

// idlepMask
nidlepMask := make([]uint32, maskWords)
copy(nidlepMask, idlepMask)
idlepMask = nidlepMask

// timerpMask
ntimerpMask := make([]uint32, maskWords)
copy(ntimerpMask, timerpMask)
timerpMask = ntimerpMask
}
// 解锁
unlock(&allpLock)
}

// 新创建的p要进行初始化
for i := old; i < nprocs; i++ {
pp := allp[i]
// 创建p
if pp == nil {
pp = new(p)
}
// 初始化
pp.init(i)
// 纪录p指针
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}

gp := getg()
// 当前p的id<nprocs
if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
// 可以继续使用
gp.m.p.ptr().status = _Prunning
// mcache清理
// alloc列表mspan放到partial或full链表、tiny区域清空
// 清空stackcache,如果是_GCoff阶段,将空的mspan释放回mheap
gp.m.p.ptr().mcache.prepareForSweep()
} else {
// 销毁p前,先释放

// 当前p
if gp.m.p != 0 {
// 解除m绑定
gp.m.p.ptr().m = 0
}
// m解除p绑定
gp.m.p = 0
// 拿第一个p
pp := allp[0]
// 解除m绑定
pp.m = 0
// 重置为默认状态_Pidle
pp.status = _Pidle
// 绑定m和p,并清理p.mcache
acquirep(pp)
}

// g.m.p还没绑定,暂时不需要mcache0
mcache0 = nil

// 缩容后,allp销毁多余的p
for i := nprocs; i < old; i++ {
pp := allp[i]
// p销毁
pp.destroy()
// 还不能回收,因为可能有正在执行syscall的m在引用
}

// 调整
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
idlepMask = idlepMask[:maskWords]
timerpMask = timerpMask[:maskWords]
unlock(&allpLock)
}

// 非空闲的p,链表
var runnablePs *p
// 从后往前扫描
for i := nprocs - 1; i >= 0; i-- {
pp := allp[i]
// p当前使用着
if gp.m.p.ptr() == pp {
continue
}
// 重置为默认状态_Pidle
pp.status = _Pidle
// 本地队列为空
if runqempty(pp) {
// 把p放进空闲链表
pidleput(pp, now)
} else {
// 从midle空闲链表中拿一个m,放到p.m
pp.m.set(mget())
// next指针设置
pp.link.set(runnablePs)
// 替换链表头
runnablePs = pp
}
}
// 偷取顺序重置
stealOrder.reset(uint32(nprocs))
// 编译器检查用,判断是否是int32类型
var int32p *int32 = &gomaxprocs
// 更新gomaxprocs
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
// p的数量有改动
if old != nprocs {
// 重置capacity等字段,capacity=nprocs秒
gcCPULimiter.resetCapacity(now, nprocs)
}
// 返回非空闲的p
return runnablePs
}

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// p初始化
func (pp *p) init(id int32) {
// 纪录p在allp内的索引
pp.id = id
// 初始化,状态为_Pgcstop
pp.status = _Pgcstop
// 0长度切片
pp.sudogcache = pp.sudogbuf[:0]
// 0长度切片
pp.deferpool = pp.deferpoolbuf[:0]
// 写屏障缓冲区
pp.wbBuf.reset()
// mcache
if pp.mcache == nil {
// 如果是第一个p
if id == 0 {
// 全局变量未初始化
if mcache0 == nil {
throw("missing mcache?")
}
// 使用全局变量
pp.mcache = mcache0
} else {
// 运行时创建
// 从cachealloc分配器获取mcache
pp.mcache = allocmcache()
}
}
// 锁初始化(定时器timers最小堆)
lockInit(&pp.timers.mu, lockRankTimers)

// 只要p在使用,timerpMask就要标记该p
timerpMask.set(id)
// p清理空闲标志,这个标记可以实时改
idlepMask.clear(id)
}

pidle-全局空闲链表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// 把p放进空闲链表
func pidleput(pp *p, now int64) int64 {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 本地队列不为空
if !runqempty(pp) {
throw("pidleput: P has non-empty run queue")
}
// 当前时刻
if now == 0 {
now = nanotime()
}
// timers最小堆为空
if pp.timers.len.Load() == 0 {
// 清除标记
timerpMask.clear(pp.id)
}
// 标记为空闲状态
idlepMask.set(pp.id)
// 设置next指针
pp.link = sched.pidle
// 放到链表头部
sched.pidle.set(pp)
// 空闲p数量+1
sched.npidle.Add(1)
// stamp存储limiterEventIdle和now
if !pp.limiterEvent.start(limiterEventIdle, now) {
throw("must be able to track idle limiter event")
}
return now
}

// 从空闲链表拿一个p
func pidleget(now int64) (*p, int64) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 空闲p链表
pp := sched.pidle.ptr()
if pp != nil {
// 当前时刻
if now == 0 {
now = nanotime()
}
// timer可能随时都会加进来,非空闲状态都要设置
timerpMask.set(pp.id)
// 取消空闲标记
idlepMask.clear(pp.id)
// 把下一个pp放到pidle(这是一个链表)
sched.pidle = pp.link
// 空闲p数量-1
sched.npidle.Add(-1)
// 重置stamp字段,纪录耗时
pp.limiterEvent.stop(limiterEventIdle, now)
}
return pp, now
}

// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
func pidlegetSpinning(now int64) (*p, int64) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 从空闲链表拿一个p
pp, now := pidleget(now)
if pp == nil {
// 通知所有m,让其中一个m丢弃p,进入自旋等待
sched.needspinning.Store(1)
return nil, now
}

return pp, now
}

销毁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// p销毁
func (pp *p) destroy() {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)
assertWorldStopped()

// 迁移本地队列
// 本地队列不为空
for pp.runqhead != pp.runqtail {
// tail索引
pp.runqtail--
// g
gp := pp.runq[pp.runqtail%uint32(len(pp.runq))].ptr()
// 把g放到全局队列头部
globrunqputhead(gp)
}
// runnext不为空
if pp.runnext != 0 {
// 把g放到全局队列头部
globrunqputhead(pp.runnext.ptr())
// 重置runnext
pp.runnext = 0
}

// 迁移timers
// 把最小堆里的timer全部迁移到当前p
getg().m.p.ptr().timers.take(&pp.timers)

// 写屏障
// GC运行中
if gcphase != _GCoff {
// 扫描p.wbBuf,把标记好的数据放在p.gcw,最后清空p.wbBuf
wbBufFlush1(pp)
// wbuf1、wbuf2根据容量选择放入work.empty或work.full队列
pp.gcw.dispose()
}
// 清空sudog
for i := range pp.sudogbuf {
pp.sudogbuf[i] = nil
}
// 重置为0长度切片
pp.sudogcache = pp.sudogbuf[:0]
//
pp.pinnerCache = nil
// 清空defer
for j := range pp.deferpoolbuf {
pp.deferpoolbuf[j] = nil
}
// 重置为0长度切片
pp.deferpool = pp.deferpoolbuf[:0]
// mcache
systemstack(func() {
// 遍历mspancache
for i := 0; i < pp.mspancache.len; i++ {
// 放回spanalloc的空闲链表
mheap_.spanalloc.free(unsafe.Pointer(pp.mspancache.buf[i]))
}
// 清0
pp.mspancache.len = 0
lock(&mheap_.lock)
// 清空pcache
pp.pcache.flush(&mheap_.pages)
unlock(&mheap_.lock)
})
// mcache相关字段清空并放回cachealloc分配器空闲链表
freemcache(pp.mcache)
// 重置
pp.mcache = nil
// 把本地gFree数据迁移到全局gFree里
gfpurge(pp)
// 重置为0
pp.gcAssistTime = 0
// 标记为_Pdead
pp.status = _Pdead
}

runq-本地goroutine队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
// 把g放到本地队列
func runqput(pp *p, gp *g, next bool) {
// wasm(一般情况都有sysmon) and 放在队列首
if !haveSysmon && next {
// 放在队列尾
next = false
}
// 一般情况下raceenabled为false
// raceenabled==true and 放在队列首 and 1/2的概率?
if randomizeScheduler && next && randn(2) == 0 {
// 放在队列尾
next = false
}

// 放在队列首
if next {
retryNext:
// 纪录runnext
oldnext := pp.runnext
// 尝试替换
if !pp.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
// 替换失败,重试
goto retryNext
}
// 替换成功
// 原runnext没有纪录,直接返回
if oldnext == 0 {
return
}
// 原runnext有G指针
// 修改gp为原runnext,后面放在队列尾
gp = oldnext.ptr()
}

retry:
// head索引
h := atomic.LoadAcq(&pp.runqhead) // load-acquire
// tail索引
t := pp.runqtail
// 没有超过runq的容量-256
if t-h < uint32(len(pp.runq)) {
// 放在队列尾
pp.runq[t%uint32(len(pp.runq))].set(gp)
// 更新tail索引
atomic.StoreRel(&pp.runqtail, t+1) // store-release
return
}
// 超过队列容量
// 把一半的g放在调度器的全局队列
if runqputslow(pp, gp, h, t) {
return
}
// 失败重试
goto retry
}

// 把一半的g放在调度器的全局队列
func runqputslow(pp *p, gp *g, h, t uint32) bool {
// runq的一半+1个待入队的g
var batch [len(pp.runq)/2 + 1]*g

// 总量
n := t - h
// 一半
n = n / 2
// runq未满,异常
if n != uint32(len(pp.runq)/2) {
throw("runqputslow: queue is not full")
}
// 拿走一半的g
for i := uint32(0); i < n; i++ {
batch[i] = pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
}
// 更新head索引
if !atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release
return false
}
// 待入队的g放在尾部
batch[n] = gp

// 如果raceenabled为true,忽略
if randomizeScheduler {
for i := uint32(1); i <= n; i++ {
j := cheaprandn(i + 1)
batch[i], batch[j] = batch[j], batch[i]
}
}

// 把这部份g链接起来作为一个链表
for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1])
}

// 转换成gQueue类型
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])

// 调度器加锁
lock(&sched.lock)
// 把一批g放到全局队列
globrunqputbatch(&q, int32(n+1))
// 解锁
unlock(&sched.lock)
return true
}

// 把一批g放到本地队列尾部,如果本地队列满了放到全局队列
func runqputbatch(pp *p, q *gQueue, qsize int) {
// head索引
h := atomic.LoadAcq(&pp.runqhead)
// tail索引
t := pp.runqtail
// 已入队g数量
n := uint32(0)
// 没有超过runq的容量-256
for !q.empty() && t-h < uint32(len(pp.runq)) {
// 待入队链表首个g
gp := q.pop()
// 放入runq
pp.runq[t%uint32(len(pp.runq))].set(gp)
// 索引/计数器更新
t++
n++
}

// 队列可能已满

// 剩余待入队g数量
qsize -= int(n)

// 如果raceenabled为true,忽略
if randomizeScheduler {
off := func(o uint32) uint32 {
return (pp.runqtail + o) % uint32(len(pp.runq))
}
for i := uint32(1); i < n; i++ {
j := cheaprandn(i + 1)
pp.runq[off(i)], pp.runq[off(j)] = pp.runq[off(j)], pp.runq[off(i)]
}
}

// 更新tail索引
atomic.StoreRel(&pp.runqtail, t)
// q队列不为空,也就是说还有g没有入队
if !q.empty() {
// 调度器加锁
lock(&sched.lock)
// 把一批g放到全局队列
globrunqputbatch(q, int32(qsize))
// 解锁
unlock(&sched.lock)
}
}

// 从p的runq拿一个g
func runqget(pp *p) (gp *g, inheritTime bool) {
// runnext有数据,直接返回
next := pp.runnext
// runnext不为0 and 原子替换出runnext
if next != 0 && pp.runnext.cas(next, 0) {
// 成功拿到runnext
return next.ptr(), true
}

for {
// head索引
h := atomic.LoadAcq(&pp.runqhead) // load-acquire
// tail索引
t := pp.runqtail
// 队列为空
if t == h {
return nil, false
}
// 拿队列第一个
gp := pp.runq[h%uint32(len(pp.runq))].ptr()
// 更新head索引
if atomic.CasRel(&pp.runqhead, h, h+1) { // cas-release
return gp, false
}
}
}

// 抽走p本地队列所有数据
func runqdrain(pp *p) (drainQ gQueue, n uint32) {
// 原runnext
oldNext := pp.runnext
// double-check and 原子替换出runnext
if oldNext != 0 && pp.runnext.cas(oldNext, 0) {
// 放进队列
drainQ.pushBack(oldNext.ptr())
n++
}

retry:
// head索引
h := atomic.LoadAcq(&pp.runqhead) // load-acquire
// tail索引
t := pp.runqtail
// 本地队列数据量
qn := t - h
// 本地队列为空
if qn == 0 {
return
}
// 数据不一致
if qn > uint32(len(pp.runq)) {
// 重试
goto retry
}

// 先更新索引,因为可能会跟runqsteal并行运行

// 更新head索引
if !atomic.CasRel(&pp.runqhead, h, h+qn) { // cas-release
// 失败重试
goto retry
}

// 将所有g放进drainQ队列
for i := uint32(0); i < qn; i++ {
gp := pp.runq[(h+i)%uint32(len(pp.runq))].ptr()
drainQ.pushBack(gp)
n++
}
return
}

// 从指定p的本地队列偷走一半g
// pp是其他p
// batch是当前p的runq指针
// batchHead是当前p的runq的尾索引
func runqgrab(pp *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
for {
// head索引
h := atomic.LoadAcq(&pp.runqhead) // load-acquire
// tail索引
t := atomic.LoadAcq(&pp.runqtail) // load-acquire
// 总量
n := t - h
// 一半
n = n - n/2
// runq为空
if n == 0 {
// 如果需要把pp.runnext偷走
if stealRunNextG {
// pp.runnext有数据
if next := pp.runnext; next != 0 {
// p的状态为_Prunning
if pp.status == _Prunning {
// 睡眠3us,确保当前g还在忙碌,没空执行runnext
// osHasLowResTimer一般为false
if !osHasLowResTimer {
usleep(3)
} else {
// 其他平台的定时器的颗粒度是1-15ms,太大了,不适合
// 主动让出CPU,让OS选择其他线程运行(微秒级到毫秒级)
osyield()
}
}
// 原子替换出runnext
if !pp.runnext.cas(next, 0) {
// 替换失败,重试
continue
}
// 只偷走runnext
batch[batchHead%uint32(len(batch))] = next
return 1
}
}
// 不需要偷走runnext,返回0
return 0
}

// runq不为空

// 数据前后不一致
if n > uint32(len(pp.runq)/2) {
// 重试
continue
}
// 拿走一半的g
for i := uint32(0); i < n; i++ {
// g
g := pp.runq[(h+i)%uint32(len(pp.runq))]
// 放到当前p.runq
batch[(batchHead+i)%uint32(len(batch))] = g
}
// 更新head索引
if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release
return n
}
}
}

// 从p2的runq偷走一半的g放到p的runq,最后返回一个g
func runqsteal(pp, p2 *p, stealRunNextG bool) *g {
// tail索引
t := pp.runqtail
// 从p2的本地队列偷走一半的g
n := runqgrab(p2, &pp.runq, t, stealRunNextG)
// 没偷到
if n == 0 {
return nil
}
// [0,n)
n--
// 拿最后一个g
gp := pp.runq[(t+n)%uint32(len(pp.runq))].ptr()
// 总共才偷到一个g
if n == 0 {
return gp
}

// 偷到多个g的情况

// head索引
h := atomic.LoadAcq(&pp.runqhead) // load-acquire
// 超过队列容量,异常
if t-h+n >= uint32(len(pp.runq)) {
throw("runqsteal: runq overflow")
}
// 更新tail索引
atomic.StoreRel(&pp.runqtail, t+n) // store-release
return gp
}

调度器相关

g/m挂起、唤醒

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
// 同协程yield关键字,当前g让出CPU,g0执行调度运行其他g,非抢占
func Gosched() {
// 该函数只在wasm有效
checkTimeouts()
// 当前g切换到g0,运行gosched_m函数
// 当前g、m解除绑定,g交给其他空闲m执行,当前m重新寻找并运行可运行的g
mcall(gosched_m)
}

// 当前g、m解除绑定,g交给其他空闲m执行,当前m重新寻找并运行可运行的g
func gosched_m(gp *g) {
// 非抢占
// 当前g、m解除绑定,g交给其他空闲m执行,当前m重新寻找并运行可运行的g
goschedImpl(gp, false)
}

// 当前g、m解除绑定,g交给其他空闲m执行,当前m重新寻找并运行可运行的g
func goschedImpl(gp *g, preempted bool) {
// 获取g.atomicstatus状态
status := readgstatus(gp)
// 异常,_Grunning位未设置
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}

// 从_Grunning状态改为_Grunnable
casgstatus(gp, _Grunning, _Grunnable)

// g、m解除绑定
dropg()
// 调度器加锁
lock(&sched.lock)
// 把g放到全局队列尾部
globrunqput(gp)
// 解锁
unlock(&sched.lock)

// runtime已执行初始化,允许newproc创建运行M
if mainStarted {
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}

// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

// 当前g让出CPU,g0执行调度运行其他g
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceReason traceBlockReason, traceskip int) {
// 非time.Sleep
if reason != waitReasonSleep {
// 该函数只在wasm有效
checkTimeouts()
}

mp := acquirem()
gp := mp.curg

// 获取g.atomicstatus状态
status := readgstatus(gp)
// 异常,_Grunning位未设置
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
// 纪录解锁函数、参数等信息
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waitTraceBlockReason = traceReason
mp.waitTraceSkip = traceskip

releasem(mp)
// 当前g切换到g0,运行park_m函数
mcall(park_m)
}

// 将g放到本地队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func goready(gp *g, traceskip int) {
systemstack(func() {
// 将g放到p.runq队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
// 放到队列头部
ready(gp, traceskip, true)
})
}

// 将g放到p.runq队列,从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func ready(gp *g, traceskip int, next bool) {
// 获取g.atomicstatus状态
status := readgstatus(gp)

mp := acquirem()
// 预期状态是_Gwaiting,异常
if status&^_Gscan != _Gwaiting {
// 打印g状态
dumpgstatus(gp)
throw("bad g->status in ready")
}

// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)

// 把g放到本地队列,根据next判断放在首尾
runqput(mp.p.ptr(), gp, next)
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()

releasem(mp)
}

// 将g放回本地队列尾部,重新寻找并运行可运行的g
func goyield() {
// 该函数只在wasm有效
checkTimeouts()
// 当前g切换到g0,运行goyield_m函数
// 将g放回本地队列尾部,重新寻找并运行可运行的g
mcall(goyield_m)
}

// 将g放回本地队列尾部,重新寻找并运行可运行的g
func goyield_m(gp *g) {
pp := gp.m.p.ptr()

// 从_Grunning状态改为_Grunnable
casgstatus(gp, _Grunning, _Grunnable)

// g、m解除绑定
dropg()
// 把g放到本地队列尾部
runqput(pp, gp, false)
// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

// m挂起休眠,等待唤醒
func mPark() {
gp := getg()
// semaphore,只有g0可以执行

// 挂起休眠(m放在m.park.key),标记blocked为true
notesleep(&gp.m.park)
// 被notewakeup唤醒,标记blocked为false

// 将m.park.key重置为0
noteclear(&gp.m.park)
}

// 解除绑定、修改状态、运行调度函数
func park_m(gp *g) {
mp := getg().m

// 测试,忽略
sg := gp.syncGroup
if sg != nil {
sg.incActive()
}

// 从_Grunning状态改为_Gwaiting
casgstatus(gp, _Grunning, _Gwaiting)

// g、m解除绑定
dropg()

// 解锁函数
if fn := mp.waitunlockf; fn != nil {
// 执行是否成功
ok := fn(gp, mp.waitlock)
// 移除函数及其参数数据
mp.waitunlockf = nil
mp.waitlock = nil
// 异常
if !ok {
// 从_Gwaiting状态改为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
// 测试,忽略
if sg != nil {
sg.decActive()
}
// g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
execute(gp, true)
}
}

// 测试,忽略
if sg != nil {
sg.decActive()
}

// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

// g、m解除绑定
func dropg() {
gp := getg()

setMNoWB(&gp.m.curg.m, nil)
setGNoWB(&gp.m.curg, nil)
}

获取p

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func wakep() {
// 期望nmspinning为0,且nmspinning+=1成功
if sched.nmspinning.Load() != 0 || !sched.nmspinning.CompareAndSwap(0, 1) {
return
}

// 此时nmspinning=1

mp := acquirem()

var pp *p
// 调度器加锁
lock(&sched.lock)
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ = pidlegetSpinning(0)
// 没拿到
if pp == nil {
// 复原,nmspinning-=1
if sched.nmspinning.Add(-1) < 0 {
// 小于0,异常
throw("wakep: negative nmspinning")
}
// 解锁
unlock(&sched.lock)
releasem(mp)
return
}

// 解锁
unlock(&sched.lock)

// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, true, false)

releasem(mp)
}

// 只要有一个p是忙碌的,从剩余空闲p中拿走一个(可能拿不到)
func checkRunqsNoP(allpSnapshot []*p, idlepMaskSnapshot pMask) *p {
// 遍历所有p
for id, p2 := range allpSnapshot {
// p2忙碌中 and p2的本地队列不为空
if !idlepMaskSnapshot.read(uint32(id)) && !runqempty(p2) {
// 调度器加锁
lock(&sched.lock)
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ := pidlegetSpinning(0)
// 没拿到p
if pp == nil {
// 剩下的p也不用找了
unlock(&sched.lock)
return nil
}
// 找到了
// 解锁
unlock(&sched.lock)
return pp
}
}

// 所有的p都处于默认空闲状态或者runq为空
return nil
}

// 找到所有p中的最小pollUntil
func checkTimersNoP(allpSnapshot []*p, timerpMaskSnapshot pMask, pollUntil int64) int64 {
// 遍历所有p
for id, p2 := range allpSnapshot {
// p2可能有timer
if timerpMaskSnapshot.read(uint32(id)) {
// 获取最小堆的最小when
w := p2.timers.wakeTime()
// w也是pollUntil
// pollUntil = min(pollUntil, w)
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
}
}

// 所有p中的最小值
return pollUntil
}

// 如果GC已启动且有空闲的标记g,获取p和g
func checkIdleGCNoP() (*p, *g) {
// GC未启动/停止 or 计数器已超限
if atomic.Load(&gcBlackenEnabled) == 0 || !gcController.needIdleMarkWorker() {
return nil, nil
}
// 没有标记任务可以执行
if !gcMarkWorkAvailable(nil) {
return nil, nil
}

// 调度器加锁
lock(&sched.lock)
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, now := pidlegetSpinning(0)
// 没拿到
if pp == nil {
unlock(&sched.lock)
return nil, nil
}

// 拿到了

// GC未启动/停止 or idleMarkWorkers计数器加1成功
if gcBlackenEnabled == 0 || !gcController.addIdleMarkWorker() {
// 把p放进空闲链表
pidleput(pp, now)
unlock(&sched.lock)
return nil, nil
}

//
node := (*gcBgMarkWorkerNode)(gcBgMarkWorkerPool.pop())
//
if node == nil {
// 把p放进空闲链表
pidleput(pp, now)
unlock(&sched.lock)
// 恢复状态,idleMarkWorkers计数器减1
gcController.removeIdleMarkWorker()
return nil, nil
}

unlock(&sched.lock)

return pp, node.gp.ptr()
}

偷取任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
// 尝试从所有其他p偷取g
func stealWork(now int64) (gp *g, inheritTime bool, rnow, pollUntil int64, newWork bool) {
// p
pp := getg().m.p.ptr()

// 是否有执行过timer
ranTimer := false

// 最多尝试4次
const stealTries = 4
for i := 0; i < stealTries; i++ {
// 如果是最后一次,偷取timer或者runnext
stealTimersOrRunNextG := i == stealTries-1

// 随机选一个p
for enum := stealOrder.start(cheaprand()); !enum.done(); enum.next() {
// STW,当前p释放到空闲队列
if sched.gcwaiting.Load() {
return nil, false, now, pollUntil, true
}
// p2
p2 := allp[enum.position()]
// 同一个p
if pp == p2 {
continue
}

// 如果需要偷取timer或者runnext and p可能有timer
if stealTimersOrRunNextG && timerpMask.read(enum.position()) {
// 定时器p.timers检查
// 清理最小堆,把所有标记删除的timer都移除出最小堆,如果最小的timer到期,则执行回调函数f运行
tnow, w, ran := p2.timers.check(now)
// 当前时刻
now = tnow
// w也是pollUntil
// pollUntil = min(pollUntil, w)
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
// 如果有执行过timer,有g被放到runq
if ran {
// 从p2的runq拿一个g
if gp, inheritTime := runqget(pp); gp != nil {
// 成功
// 这里的ranTimer是不是应该改为true?
return gp, inheritTime, now, pollUntil, ranTimer
}
// 没拿到
// 标记,表示有执行过timer
ranTimer = true
}
}

// 非空闲状态,如果p2处于默认的空闲状态,则不处理
if !idlepMask.read(enum.position()) {
// 从p2的runq偷走一半的g放到p的runq,最后返回一个g
if gp := runqsteal(pp, p2, stealTimersOrRunNextG); gp != nil {
// 偷到了
return gp, false, now, pollUntil, ranTimer
}
}
}
}

return nil, false, now, pollUntil, ranTimer
}

runq-全局队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
// 把g放到全局队列尾部
func globrunqput(gp *g) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 放到全局队列末尾
sched.runq.pushBack(gp)
// 计数器+1
sched.runqsize++
}

// 把g放到全局队列头部
func globrunqputhead(gp *g) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 放到全局队列头部
sched.runq.push(gp)
// 计数器+1
sched.runqsize++
}

// 把一批g放到全局队列
func globrunqputbatch(batch *gQueue, n int32) {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 把q2链表放到q链表末尾
sched.runq.pushBackAll(*batch)
// 计数器+n
sched.runqsize += n
// 数据清理
*batch = gQueue{}
}

// 从全局队列获取一批p放到本地队列,返回第一个g
func globrunqget(pp *p, max int32) *g {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 全局队列为空
if sched.runqsize == 0 {
return nil
}

// 按p数量均分
n := sched.runqsize/gomaxprocs + 1
// 边界处理
if n > sched.runqsize {
n = sched.runqsize
}
// 最多max个g
if max > 0 && n > max {
n = max
}
// 最多128个g
if n > int32(len(pp.runq))/2 {
n = int32(len(pp.runq)) / 2
}

// 计数器-n
sched.runqsize -= n

// 拿链表第一个g
gp := sched.runq.pop()
// 总量-1
n--
// 如果剩余总量不为0
for ; n > 0; n-- {
// 一次拿一个g
gp1 := sched.runq.pop()
// 把g放到本地队列尾部
runqput(pp, gp1, false)
}
// 返回第一个g
return gp
}

// 修改g状态放进本地/全局队列,并尝试唤醒m处理
func injectglist(glist *gList) {
// 为空,不处理
if glist.empty() {
return
}

// 第一个g
head := glist.head.ptr()
var tail *g
qsize := 0

// 定位到队列的尾部、计算数据量、g状态修改
for gp := head; gp != nil; gp = gp.schedlink.ptr() {
tail = gp
qsize++
// 非_Gscan状态转换,统计g位于_Grunnable、_Gwaiting状态时所耗费的时间
casgstatus(gp, _Gwaiting, _Grunnable)
}

// gList转换为gQueue
var q gQueue
q.head.set(head)
q.tail.set(tail)
*glist = gList{}

startIdle := func(n int) {
for i := 0; i < n; i++ {
mp := acquirem()
// 调度器加锁
lock(&sched.lock)

// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ := pidlegetSpinning(0)
// 没拿到p
if pp == nil {
// 解锁,返回
unlock(&sched.lock)
releasem(mp)
break
}

// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, false, true)
// 解锁
unlock(&sched.lock)
releasem(mp)
}
}

// p
pp := getg().m.p.ptr()

// 没有p
if pp == nil {
// 调度器加锁
lock(&sched.lock)
// 把一批g放到全局队列
globrunqputbatch(&q, int32(qsize))
// 解锁
unlock(&sched.lock)
// 尝试唤醒m处理
startIdle(qsize)
return
}

// 有p

// 空闲p数量
npidle := int(sched.npidle.Load())
var (
globq gQueue
n int
)
// 拿一批最多与空闲p数量相等的g
for n = 0; n < npidle && !q.empty(); n++ {
g := q.pop()
globq.pushBack(g)
}
// 有空闲p
if n > 0 {
// 调度器加锁
lock(&sched.lock)
// 把上面这批g放到全局队列
globrunqputbatch(&globq, int32(n))
// 解锁
unlock(&sched.lock)
// 尝试唤醒m处理
startIdle(n)
// 剩余g数量
qsize -= n
}

// q队列不为空,也就是说还有g没有入队
if !q.empty() {
// 把一批g放到本地队列尾部,如果本地队列满了放到全局队列
runqputbatch(pp, &q, qsize)
}

// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}

g/m绑定、解绑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
func lockOSThread() {
// 计数器m.lockedInt+=1
getg().m.lockedInt++
dolockOSThread()
}

func dolockOSThread() {
if GOARCH == "wasm" {
return
}
gp := getg()

// 放在lockedg、lockedm,双向绑定
gp.m.lockedg.set(gp)
gp.lockedm.set(gp.m)
}

func unlockOSThread() {
gp := getg()
// 异常
if gp.m.lockedInt == 0 {
// 抛出异常
systemstack(badunlockosthread)
}
// 计数器m.lockedInt-=1
gp.m.lockedInt--
// 重置lockedg、lockedm
dounlockOSThread()
}

func dounlockOSThread() {
if GOARCH == "wasm" {
return
}
gp := getg()
// 计数器m.lockedInt不为0,不处理
if gp.m.lockedInt != 0 || gp.m.lockedExt != 0 {
return
}
// 重置
gp.m.lockedg = 0
gp.lockedm = 0
}

// 让出p给lockedm并唤醒,把当前m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
func startlockedm(gp *g) {
// m
mp := gp.lockedm.ptr()
// m跟当前g的m不一样
if mp == getg().m {
throw("startlockedm: locked to me")
}
// nextp有数据
if mp.nextp != 0 {
throw("startlockedm: m has p")
}
// 空闲的locked的m数量-=1
incidlelocked(-1)
// 取消p与m的绑定
pp := releasep()
// nextp设置为p
mp.nextp.set(pp)
// 唤醒m(m放在m.park.key)
notewakeup(&mp.park)
// 把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
stopm()
}

// m让出p并挂起休眠,直到lockedg状态变为可运行,被唤醒后绑定一个p返回
func stoplockedm() {
gp := getg()

// m.lockedg为0(不为0才会走到这里) or g.lockedm不是当前m
if gp.m.lockedg == 0 || gp.m.lockedg.ptr().lockedm.ptr() != gp.m {
throw("stoplockedm: inconsistent locking")
}
// p不为空,把这个p给其他m
if gp.m.p != 0 {
// 取消p与m的绑定
pp := releasep()
// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
handoffp(pp)
}

// 复原,空闲的locked的m数量+=1,检查是否存在死锁
incidlelocked(1)

// m挂起休眠,等待唤醒
mPark()
// 被唤醒

// 获取locked的g.atomicstatus状态
status := readgstatus(gp.m.lockedg.ptr())

// 如果状态不是_Grunnable,抛出异常
if status&^_Gscan != _Grunnable {
print("runtime:stoplockedm: lockedg (atomicstatus=", status, ") is not Grunnable or Gscanrunnable\n")
dumpgstatus(gp.m.lockedg.ptr())
throw("stoplockedm: not runnable")
}
// 绑定m和p,并清理p.mcache
acquirep(gp.m.nextp.ptr())
// 重置nextp
gp.m.nextp = 0
}

// 空闲的locked的m数量调整
func incidlelocked(v int32) {
// 调度器加锁
lock(&sched.lock)
sched.nmidlelocked += v
// v为-1时加锁,为1时是解锁
if v > 0 {
// 检查是否存在死锁
checkdead()
}
// 解锁
unlock(&sched.lock)
}

m/p绑定、解绑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
// 绑定m和p,并清理p.mcache
func acquirep(pp *p) {
// 绑定m和p
wirep(pp)

// 开启写屏障

// mcache清理
// alloc列表mspan放到partial或full链表、tiny区域清空
// 清空stackcache,如果是_GCoff阶段,将空的mspan释放回mheap
pp.mcache.prepareForSweep()
}

// 绑定m和p
func wirep(pp *p) {
gp := getg()

// m已经跟其他p绑定
if gp.m.p != 0 {
systemstack(func() {
throw("wirep: already in go")
})
}

// p也有其他m绑定着 or p的状态不是默认的_Pidle
if pp.m != 0 || pp.status != _Pidle {
// 抛出异常
systemstack(func() {
id := int64(0)
if pp.m != 0 {
id = pp.m.ptr().id
}
print("wirep: p->m=", pp.m, "(", id, ") p->status=", pp.status, "\n")
throw("wirep: invalid p state")
})
}

// m.p绑定p
gp.m.p.set(pp)
// p.m绑定m
pp.m.set(gp.m)
// 从_Pidle状态改为_Prunning
pp.status = _Prunning
}

// 取消p与m的绑定
func releasep() *p {
// 取消p与m的绑定
return releasepNoTrace()
}

// 取消p与m的绑定
func releasepNoTrace() *p {
gp := getg()

// g.m.p不为0才会走到这里
if gp.m.p == 0 {
throw("releasep: invalid arg")
}

// p
pp := gp.m.p.ptr()
// p.m不是当前m or p.status不是_Prunning
if pp.m.ptr() != gp.m || pp.status != _Prunning {
print("releasep: m=", gp.m, " m->p=", gp.m.p.ptr(), " p->m=", hex(pp.m), " p->status=", pp.status, "\n")
throw("releasep: invalid p state")
}
// m.p取消绑定
gp.m.p = 0
// p.m取消绑定
pp.m = 0
// 重置为默认状态_Pidle
pp.status = _Pidle
return pp
}

// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
func handoffp(pp *p) {
// 本地队列不为空 or 全局队列不为空
if !runqempty(pp) || sched.runqsize != 0 {
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, false, false)
return
}

// GC已启动(gcStart时为true) and 还有标记任务可以执行
if gcBlackenEnabled != 0 && gcMarkWorkAvailable(pp) {
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, false, false)
return
}

// 没有任务可以执行

// 自旋等待的m
if sched.nmspinning.Load()+sched.npidle.Load() == 0 && sched.nmspinning.CompareAndSwap(0, 1) {
// 重置,其他m不需要自旋让出p
sched.needspinning.Store(0)
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, true, false)
return
}

// 调度器加锁
lock(&sched.lock)
// STW,当前p释放到空闲队列
if sched.gcwaiting.Load() {
// 状态改为_Pgcstop
pp.status = _Pgcstop
// p的停止时间
pp.gcStopTime = nanotime()
// 待_Pgcstop的p数量-1
sched.stopwait--
// 所有p已_Pgcstop
if sched.stopwait == 0 {
// 唤醒m(m放在stopnote.key)执行GC
notewakeup(&sched.stopnote)
}
unlock(&sched.lock)
return
}
// GC标志需要执行safePointFn and 重置runSafePointFn成功
if pp.runSafePointFn != 0 && atomic.Cas(&pp.runSafePointFn, 1, 0) {
// 执行safePointFn
sched.safePointFn(pp)
// 计数器safePointWait-=1(safePointWait=gomaxprocs-1)
sched.safePointWait--
// 如果是最后一个p(其他p都已经执行了safePointFn)
if sched.safePointWait == 0 {
// 唤醒m(m放在safePointNote.key)
notewakeup(&sched.safePointNote)
}
}
// 全局队列不为空
if sched.runqsize != 0 {
unlock(&sched.lock)
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, false, false)
return
}

// p是最后一放入空闲队列的(其他p都在空闲队列) and 当前没有进行netpoll轮询
if sched.npidle.Load() == gomaxprocs-1 && sched.lastpoll.Load() != 0 {
// 解锁
unlock(&sched.lock)
// 从midle空闲链表中拿一个m绑定p并唤醒(可能拿不到p)
startm(pp, false, false)
return
}

// p不是最后一个放进空闲队列的 or 正在执行netpoll轮询

// 获取最小堆的最小when
when := pp.timers.wakeTime()
// 把p放进空闲链表
pidleput(pp, 0)
// 解锁
unlock(&sched.lock)

// 中断netpoll轮询
if when != 0 {
// 如果正在轮询netpoll且有更早过期的事件,中断netpoll轮询。如果没有轮询netpoll,则从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakeNetPoller(when)
}
}

netpoll

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 如果正在轮询netpoll且有更早过期的事件,中断netpoll轮询。如果没有轮询netpoll,则从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
func wakeNetPoller(when int64) {
// 正在轮询netpoll中
if sched.lastpoll.Load() == 0 {
pollerPollUntil := sched.pollUntil.Load()
// 轮询完毕 or 有更早过期的事件
if pollerPollUntil == 0 || pollerPollUntil > when {
// 中断netpoll轮询
netpollBreak()
}
} else {
// 非plan9
if GOOS != "plan9" {
// 从空闲队列拿一个p和一个m绑定并唤醒(可能拿不到p)
wakep()
}
}
}

安全点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 执行safePointFn,如果是最后一个p,则唤醒safePointNote
func runSafePointFn() {
p := getg().m.p.ptr()

// 重置runSafePointFn
if !atomic.Cas(&p.runSafePointFn, 1, 0) {
return
}

// 执行safePointFn
sched.safePointFn(p)

// 调度器加锁
lock(&sched.lock)
// 计数器safePointWait-=1(safePointWait=gomaxprocs-1)
sched.safePointWait--
// 如果是最后一个p(其他p都已经执行了safePointFn)
if sched.safePointWait == 0 {
// 唤醒m(m放在safePointNote.key)
notewakeup(&sched.safePointNote)
}
// 解锁
unlock(&sched.lock)
}

寄存器数据保存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 保存到sched字段
func save(pc, sp, bp uintptr) {
gp := getg()

// 如果是g0或者是gsignal
if gp == gp.m.g0 || gp == gp.m.gsignal {
throw("save on system g not allowed")
}

// 保存到sched字段
gp.sched.pc = pc
gp.sched.sp = sp
gp.sched.lr = 0
gp.sched.ret = 0
gp.sched.bp = bp

// 确保ctxt为0
if gp.sched.ctxt != nil {
badctxt()
}
}

抢占相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
// 遍历p,抢占超时的g、回收阻塞在syscall的p
func retake(now int64) uint32 {
// 强制回收p的数量
n := 0

// allp加锁
lock(&allpLock)

// 遍历allp,因为会临时解锁,不能用range
for i := 0; i < len(allp); i++ {
// p
pp := allp[i]
// 扩容中
if pp == nil {
// 不处理当前p
continue
}
// sysmon纪录的的当前p的调度次数
pd := &pp.sysmontick
// 下面只判断_Prunning、_Psyscall两种状态
s := pp.status
// m是否进入syscall
sysretake := false

if s == _Prunning || s == _Psyscall {
// p的调度次数
t := int64(pp.schedtick)
// p的调度次数跟sysmon纪录的的调度次数不一致
if int64(pd.schedtick) != t {
// 同步调度次数
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
// 调度次数一致,但超时了(10ms)

// 设置g、p抢占标志、发送抢占信号给m
preemptone(pp)
// m进入syscall,preemptone失效
sysretake = true
}
}

// syscall时重试
if s == _Psyscall {
// p的syscall次数
t := int64(pp.syscalltick)
// m没有进入syscall and p与sysmon纪录的syscall次数不一致
if !sysretake && int64(pd.syscalltick) != t {
// 同步syscall次数
pd.syscalltick = uint32(t)
pd.syscallwhen = now
// 不处理当前p
continue
}

// m进入syscall or p与sysmon纪录的syscall次数一致

// 本地队列为空 and 自旋m数量+空闲p数量>0 and syscall次数一致但没有超时(10ms)
if runqempty(pp) && sched.nmspinning.Load()+sched.npidle.Load() > 0 && pd.syscallwhen+10*1000*1000 > now {
// 不处理当前p
continue
}

// allp解锁,以便接下来调度器加锁
unlock(&allpLock)

// 空闲的locked的m数量-=1,假装是locked的m在运行
incidlelocked(-1)
// 强制回收p
// p状态改为_Pidle
if atomic.Cas(&pp.status, s, _Pidle) {
// 强制回收p的数量+1
n++
// p的syscall次数+=1
pp.syscalltick++
// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
handoffp(pp)
}
// 复原,空闲的locked的m数量+=1,检查是否存在死锁
incidlelocked(1)
// 重新加锁
lock(&allpLock)
}
}
unlock(&allpLock)
return uint32(n)
}

// 逐个设置p的抢占标志,发送信号给线程
func preemptall() bool {
res := false
// 遍历所有p
for _, pp := range allp {
// p的状态非_Prunning
if pp.status != _Prunning {
continue
}
// 设置g、p抢占标志、发送抢占信号给m
if preemptone(pp) {
res = true
}
}
// 有一个成功即为true
return res
}

// 设置g、p抢占标志、发送抢占信号给m
func preemptone(pp *p) bool {
mp := pp.m.ptr()
// m为nil or m为当前m
if mp == nil || mp == getg().m {
return false
}
// g
gp := mp.curg
// g为nil or m为g0
if gp == nil || gp == mp.g0 {
return false
}

// g设置标志位(可能设置错了g,也可能不执行)
gp.preempt = true

// stackguard0(用于正常函数调用时检查)设置为0xfffffade
gp.stackguard0 = stackPreempt

// preemptMSupported默认为true and asyncpreemptoff默认为0(为1时禁用基于信号的异步抢占)
if preemptMSupported && debug.asyncpreemptoff == 0 {
// p设置标志位
pp.preempt = true
// 发送抢占信号给m(平台相关)
preemptM(mp)
}

return true
}

// g状态修改、解除与m的绑定,重新寻找并运行可运行的g(该函数永不返回)
func preemptPark(gp *g) {
// 获取g.atomicstatus状态
status := readgstatus(gp)
// 异常,_Grunning位未设置
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}


// asyncSafePoint
if gp.asyncSafePoint {
// double-check
f := findfunc(gp.sched.pc)
if !f.valid() {
throw("preempt at unknown pc")
}
if f.flag&abi.FuncFlagSPWrite != 0 {
println("runtime: unexpected SPWRITE function", funcname(f), "in async preempt")
throw("preempt SPWRITE")
}
}

// 从_Grunning状态改为_Gscanpreempted
casGToPreemptScan(gp, _Grunning, _Gscan|_Gpreempted)

// g、m解除绑定
dropg()

// 从_Gscanpreempted状态改为_Gpreempted
casfrom_Gscanstatus(gp, _Gscan|_Gpreempted, _Gpreempted)

// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

系统调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
// 修改g状态、纪录寄存器状态,唤醒sysmon、GC等线程
func reentersyscall(pc, sp, bp uintptr) {
gp := getg()

// m加锁
gp.m.locks++

// stackguard0(用于正常函数调用时检查)设置为0xfffffade
gp.stackguard0 = stackPreempt
// 禁止栈分裂/扩容,否则抛出异常
gp.throwsplit = true

// 存储到g.sched
save(pc, sp, bp)
// 同步到syscall相关字段
gp.syscallsp = sp
gp.syscallpc = pc
gp.syscallbp = bp

// 从_Grunning状态改为_Gsyscall
casgstatus(gp, _Grunning, _Gsyscall)

// staticLockRanking默认为false
if staticLockRanking {
save(pc, sp, bp)
}

// 超过边界,异常
if gp.syscallsp < gp.stack.lo || gp.stack.hi < gp.syscallsp {
systemstack(func() {
print("entersyscall inconsistent sp ", hex(gp.syscallsp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")
throw("entersyscall")
})
}
// 超过边界,异常
if gp.syscallbp != 0 && gp.syscallbp < gp.stack.lo || gp.stack.hi < gp.syscallbp {
systemstack(func() {
print("entersyscall inconsistent bp ", hex(gp.syscallbp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")
throw("entersyscall")
})
}

// sysmon挂起休眠了
if sched.sysmonwait.Load() {
// entersyscall_sysmon
// 有工作了,唤醒sysmon
systemstack(entersyscall_sysmon)
// 存储到g.sched
save(pc, sp, bp)
}

// GC标志需要执行safePointFn
if gp.m.p.ptr().runSafePointFn != 0 {
// 执行runSafePointFn函数
// 执行safePointFn,如果是最后一个p,则唤醒safePointNote
systemstack(runSafePointFn)
// 存储到g.sched
save(pc, sp, bp)
}

// 纪录p的syscall次数到m
gp.m.syscalltick = gp.m.p.ptr().syscalltick
// p
pp := gp.m.p.ptr()
// p取消m绑定
pp.m = 0
// m纪录p到oldp
gp.m.oldp.set(pp)
// m取消p绑定
gp.m.p = 0
// 状态改为_Psyscall
atomic.Store(&pp.status, _Psyscall)
// STW,当前p释放到空闲队列
if sched.gcwaiting.Load() {
// 执行entersyscall_gcwait
// p停止,唤醒m执行GC
systemstack(entersyscall_gcwait)
// 存储到g.sched
save(pc, sp, bp)
}

// 解锁
gp.m.locks--
}

// 修改g状态、纪录寄存器状态,唤醒sysmon、GC等线程
func entersyscall() {
fp := getcallerfp()
// 修改g状态、纪录寄存器状态,唤醒sysmon、GC等线程
reentersyscall(sys.GetCallerPC(), sys.GetCallerSP(), fp)
}

// 有工作了,唤醒sysmon
func entersyscall_sysmon() {
// 调度器加锁
lock(&sched.lock)
// sysmon空闲等待中
if sched.sysmonwait.Load() {
// 取消等待标记
sched.sysmonwait.Store(false)
// 唤醒sysmon(m放在sysmonnote.key)
notewakeup(&sched.sysmonnote)
}
// 解锁
unlock(&sched.lock)
}

// p停止,唤醒m执行GC
func entersyscall_gcwait() {
gp := getg()
// syscall前使用的p
pp := gp.m.oldp.ptr()

// 调度器加锁
lock(&sched.lock)
// GC and p从_Psyscall状态改为_Pgcstop
if sched.stopwait > 0 && atomic.Cas(&pp.status, _Psyscall, _Pgcstop) {
// p的停止时间
pp.gcStopTime = nanotime()
// p的syscall次数+=1
pp.syscalltick++
// 待_Pgcstop的p数量-1,如果所有p已_Pgcstop
if sched.stopwait--; sched.stopwait == 0 {
// 唤醒m(m放在stopnote.key)执行GC
notewakeup(&sched.stopnote)
}
}
// 解锁
unlock(&sched.lock)
}

// 修改g状态、纪录寄存器状态,让出p给其他m或者把p放回空闲队列
func entersyscallblock() {
gp := getg()

// m加锁
gp.m.locks++
// 禁止栈分裂/扩容,否则抛出异常
gp.throwsplit = true
// stackguard0(用于正常函数调用时检查)设置为0xfffffade
gp.stackguard0 = stackPreempt
// 纪录p的syscall次数到m
gp.m.syscalltick = gp.m.p.ptr().syscalltick
// p的syscall次数+=1
gp.m.p.ptr().syscalltick++

//
pc := sys.GetCallerPC()
sp := sys.GetCallerSP()
bp := getcallerfp()
// 存储到g.sched
save(pc, sp, bp)
// 同步到syscall相关字段
gp.syscallsp = gp.sched.sp
gp.syscallpc = gp.sched.pc
gp.syscallbp = gp.sched.bp

// 超过边界,异常
if gp.syscallsp < gp.stack.lo || gp.stack.hi < gp.syscallsp {
sp1 := sp
sp2 := gp.sched.sp
sp3 := gp.syscallsp
systemstack(func() {
print("entersyscallblock inconsistent sp ", hex(sp1), " ", hex(sp2), " ", hex(sp3), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")
throw("entersyscallblock")
})
}

// 从_Grunning状态改为_Gsyscall
casgstatus(gp, _Grunning, _Gsyscall)

// 超过边界,异常
if gp.syscallsp < gp.stack.lo || gp.stack.hi < gp.syscallsp {
systemstack(func() {
print("entersyscallblock inconsistent sp ", hex(sp), " ", hex(gp.sched.sp), " ", hex(gp.syscallsp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")
throw("entersyscallblock")
})
}
// 超过边界,异常
if gp.syscallbp != 0 && gp.syscallbp < gp.stack.lo || gp.stack.hi < gp.syscallbp {
systemstack(func() {
print("entersyscallblock inconsistent bp ", hex(bp), " ", hex(gp.sched.bp), " ", hex(gp.syscallbp), " [", hex(gp.stack.lo), ",", hex(gp.stack.hi), "]\n")
throw("entersyscallblock")
})
}

// 执行entersyscallblock_handoff
// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
systemstack(entersyscallblock_handoff)

// 存储到g.sched
save(sys.GetCallerPC(), sys.GetCallerSP(), getcallerfp())

// m解锁
gp.m.locks--
}

// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
func entersyscallblock_handoff() {
// 当m执行syscall或锁定时,让出p给其他m或者把p放回空闲队列
handoffp(releasep())
}

// m尝试绑定syscall前使用的p,或者从空闲p列表拿一个绑定,绑定成功返回,失败则把m放到空闲队列
func exitsyscall() {
gp := getg()

// m加锁
gp.m.locks++

if sys.GetCallerSP() > gp.syscallsp {
throw("exitsyscall: syscall frame is no longer valid")
}

// 重置
gp.waitsince = 0
// syscall前使用的p
oldp := gp.m.oldp.ptr()
// 重置该字段
gp.m.oldp = 0
// m尝试绑定syscall前使用的p,或者从空闲p列表拿一个绑定
if exitsyscallfast(oldp) {
// pprof相关
if goroutineProfile.active {
systemstack(func() {
// 在写屏障路径上记录g的profile相关信息
tryRecordGoroutineProfileWB(gp)
})
}

// p的syscall次数+=1
gp.m.p.ptr().syscalltick++
// 从_Gsyscall状态改为_Grunning
casgstatus(gp, _Gsyscall, _Grunning)

// GC未运行
//
gp.syscallsp = 0
// m解锁
gp.m.locks--
// g被抢占
if gp.preempt {
// stackguard0(用于正常函数调用时检查)设置为0xfffffade
gp.stackguard0 = stackPreempt
} else {
// 非抢占
// stack.lo+928
gp.stackguard0 = gp.stack.lo + stackGuard
}
// 允许栈分裂/扩容
gp.throwsplit = false

// GC时只有sys类型的g允许运行
if sched.disable.user && !schedEnabled(gp) {
// 同协程yield关键字,当前g让出CPU,g0执行调度运行其他g,非抢占
Gosched()
}

return
}

// 拿不到p

// m解锁
gp.m.locks--

// 当前g切换到g0,运行exitsyscall0函数
// 能拿到p则绑定执行g,拿不到时,如果有locked的m则让出CPU等待locked的g可运行,否则把当前m放到空闲m列表
mcall(exitsyscall0)

//
gp.syscallsp = 0
// p的syscall次数+=1
gp.m.p.ptr().syscalltick++
// 允许栈分裂/扩容
gp.throwsplit = false
}

// 尝试绑定m和oldp,或者从空闲p列表拿一个绑定
func exitsyscallfast(oldp *p) bool {
// freezeStopWait表示发生panic
if sched.stopwait == freezeStopWait {
return false
}

// 尝试将oldp的状态从_Psyscall改为_Pidle
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
// 绑定m和oldp
wirep(oldp)
// 同步m与p的syscall次数
exitsyscallfast_reacquired(trace)
return true
}

// oldp为nil or oldp状态非_Psyscall or 状态修改失败

// 空闲p链表有数据
if sched.pidle != 0 {
var ok bool
systemstack(func() {
// 从空闲链表拿一个p并绑定m
ok = exitsyscallfast_pidle()
})
// 拿到了
if ok {
return true
}
}
// 没拿到
return false
}

// 同步m与p的syscall次数
func exitsyscallfast_reacquired(trace traceLocker) {
gp := getg()
// m与p的syscall次数不一致
if gp.m.syscalltick != gp.m.p.ptr().syscalltick {
// p的syscall次数+=1
gp.m.p.ptr().syscalltick++
}
}

// 从空闲链表拿一个p并绑定m
func exitsyscallfast_pidle() bool {
// 调度器加锁
lock(&sched.lock)
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ := pidleget(0)
// 拿到p and sysmon挂起休眠了
if pp != nil && sched.sysmonwait.Load() {
// 取消等待标记
sched.sysmonwait.Store(false)
// 唤醒sysmon(m放在sysmonnote.key)
notewakeup(&sched.sysmonnote)
}
// 解锁
unlock(&sched.lock)

// 拿到p
if pp != nil {
// 绑定m和p,并清理p.mcache
acquirep(pp)
return true
}
return false
}

// 用于runtime,能拿到p则绑定执行g,拿不到时,如果有locked的m则让出CPU等待locked的g可运行,否则把当前m放到空闲m列表
func exitsyscall0(gp *g) {
// 从_Gsyscall状态改为_Grunnable
casgstatus(gp, _Gsyscall, _Grunnable)

// g、m解除绑定
dropg()

// 调度器加锁
lock(&sched.lock)

var pp *p
// GC开始到标记结束这个过程只允许sys类型的g运行
if schedEnabled(gp) {
// 从空闲链表拿一个p,如果没拿到则通知所有m,让其中一个m让出p并进入自旋等待
pp, _ = pidleget(0)
}

var locked bool

// 没有拿到p
if pp == nil {
// 把g放到全局队列尾部
globrunqput(gp)
// 是否有locked的m
locked = gp.lockedm != 0
} else if sched.sysmonwait.Load() {
// 拿到p,sysmon挂起休眠了

// 取消等待标记
sched.sysmonwait.Store(false)
// 唤醒sysmon(m放在sysmonnote.key)
notewakeup(&sched.sysmonnote)
}
// 解锁
unlock(&sched.lock)

// 拿到p
if pp != nil {
// 绑定m和p,并清理p.mcache
acquirep(pp)
// g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
execute(gp, false)
}

// 没有拿到p

// lockedm不为空
if locked {
// m让出p并挂起休眠,直到lockedg状态变为可运行,被唤醒后绑定一个p返回
stoplockedm()
// g调整状态等数据,和m双向绑定,最后切换到g的上下文执行(该函数永不返回)
execute(gp, false)
}

// lockedm为空

// 把m放到midle空闲链表并挂起休眠,被唤醒后绑定一个p返回
stopm()
// 被唤醒

// 寻找并运行可运行的g(该函数永不返回)
schedule()
}

main相关

init初始化函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
// 执行init函数,可以是runtime相关或module相关
// var runtime_inittasks []*initTask
func doInit(ts []*initTask) {
for _, t := range ts {
doInit1(t)
}
}

// 执行单个init函数
func doInit1(t *initTask) {
switch t.state {
case 2: // 完成
return
case 1: // 执行中
throw("recursive call during initialization - linker skew")
default:// 0-未开始
// 设置为执行中
t.state = 1

var (
start int64 // 开始时刻-单调时钟
before tracestat // 原状态
)

// inittrace-全局变量
if inittrace.active {
start = nanotime()
before = inittrace
}

// init函数数量
if t.nfns == 0 {
throw("inittask with no functions")
}

// 定位到第一个init函数的位置
firstFunc := add(unsafe.Pointer(t), 8)
// 执行所有init函数
for i := uint32(0); i < t.nfns; i++ {
p := add(firstFunc, uintptr(i)*goarch.PtrSize)
f := *(*func())(unsafe.Pointer(&p))
f()
}

// 收尾
if inittrace.active {
// 纪录结束时刻
end := nanotime()
// 当前状态
after := inittrace

f := *(*func())(unsafe.Pointer(&firstFunc))
pkg := funcpkgpath(findfunc(abi.FuncPCABIInternal(f)))

// 打印耗时等信息
var sbuf [24]byte
print("init ", pkg, " @")
print(string(fmtNSAsMS(sbuf[:], uint64(start-runtimeInitTime))), " ms, ")
print(string(fmtNSAsMS(sbuf[:], uint64(end-start))), " ms clock, ")
print(string(itoa(sbuf[:], after.bytes-before.bytes)), " bytes, ")
print(string(itoa(sbuf[:], after.allocs-before.allocs)), " allocs")
print("\n")
}

// 标记完成
t.state = 2
}
}

GC相关

启动sweeper、scavenger

1
2
3
4
5
6
7
8
9
10
11
12
// 启动sweeper、scavenger
func gcenable() {
c := make(chan int, 2)
// sweeper
go bgsweep(c)
// scavenger
go bgscavenge(c)
<-c
<-c
// 运行时已完成初始化,可以执行GC
memstats.enablegc = true
}

GC调度

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 本地/全局队列为空时,执行netpoll轮询
func pollWork() bool {
// 全局队列不为空
if sched.runqsize != 0 {
return true
}
p := getg().m.p.ptr()
// 本地队列不为空
if !runqempty(p) {
return true
}
// netpoll已初始化 and 挂起的g数量不为0 and 当前没有进行netpoll轮询
if netpollinited() && netpollAnyWaiters() && sched.lastpoll.Load() != 0 {
// 平台相关
// 执行epollWait检查,0-没有数据立即返回
if list, delta := netpoll(0); !list.empty() {
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
return true
}
}
return false
}

sysmon相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// 空闲时轮询netpoll,其他时候,抢占超时的g、回收阻塞在syscall的p
func sysmon() {
// 调度器加锁
lock(&sched.lock)
// sys类型的m数量+=1
sched.nmsys++
// 检查是否存在死锁
checkdead()
// 解锁
unlock(&sched.lock)

lasttrace := int64(0)
idle := 0
delay := uint32(0)

// 永久循环
for {
if idle == 0 { // 休眠20us
delay = 20
} else if idle > 50 { // 50次20us后,双倍休眠时间
delay *= 2
}
if delay > 10*1000 { // 最多休眠10ms
delay = 10 * 1000
}
// 开始前,先休眠一段时间
usleep(delay)

now := nanotime()
// schedtrace默认为0 and (STW,当前p释放到空闲队列 or 所有的p都空闲)
if debug.schedtrace <= 0 && (sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs) {
// 调度器加锁
lock(&sched.lock)
// STW,当前p释放到空闲队列 or 所有的p都空闲
if sched.gcwaiting.Load() || sched.npidle.Load() == gomaxprocs {
syscallWake := false
// 遍历所有P,找到全局最小的when
next := timeSleepUntil()
// 还未到超时时刻
if next > now {
// 标记空闲等待中
sched.sysmonwait.Store(true)
// 休眠前先解锁
unlock(&sched.lock)
// sleep = forcegcperiod/2 = 1min
sleep := forcegcperiod / 2
// 下一个超时时刻在1min内
if next-now < sleep {
sleep = next - now
}
// 非Windows下为true,sleep >= 0
shouldRelax := sleep >= osRelaxMinNS
if shouldRelax {
// 非Windows下为空函数
osRelax(true)
}

// 挂起休眠指定时间(m放在sysmonnote.key),标记blocked为true
syscallWake = notetsleep(&sched.sysmonnote, sleep)
// 被唤醒/超时

if shouldRelax {
// 非Windows下为空函数
osRelax(false)
}
// 唤醒后重新加锁
lock(&sched.lock)
// 取消等待标记
sched.sysmonwait.Store(false)
// 将sysmonnote.key重置为0
noteclear(&sched.sysmonnote)
}
// 成功唤醒
if syscallWake {
idle = 0
delay = 20
}
}
// 解锁
unlock(&sched.lock)
}

// 从这里到末尾都要加锁访问
lock(&sched.sysmonlock)

// 当前时刻
now = nanotime()

// cgo
if *cgo_yield != nil {
asmcgocall(*cgo_yield, nil)
}
// 上一次执行netpoll的时刻
lastpoll := sched.lastpoll.Load()
// netpoll已初始化 and 当前没有进行netpoll轮询 and lastpoll距离现在没有超过10ms
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
// lastpoll更新为当前时刻
sched.lastpoll.CompareAndSwap(lastpoll, now)
// 执行epollWait检查,0-没有数据立即返回
list, delta := netpoll(0)
// 有数据
if !list.empty() {
// 空闲的locked的m数量-=1,假装是locked的m在运行
incidlelocked(-1)
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 复原,空闲的locked的m数量+=1,检查是否存在死锁
incidlelocked(1)
// 计数器+=delta => netpollWaiters+=delta
netpollAdjustWaiters(delta)
}
}
// netbsd相关
if GOOS == "netbsd" && needSysmonWorkaround {
if next := timeSleepUntil(); next < now {
startm(nil, false, false)
}
}
// 为1时sysmon将会唤醒scavenger
if scavenger.sysmonWake.Load() != 0 {
// 重置scavenger状态,修改g状态放进本地/全局队列,并尝试唤醒m处理
scavenger.wake()
}

// 遍历p,抢占超时的g、回收阻塞在syscall的p
if retake(now) != 0 {
// 有回收p
idle = 0
} else {
// 无回收p
idle++
}

//
if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && forcegc.idle.Load() {
// forcegc加锁
lock(&forcegc.lock)
// 重置idle
forcegc.idle.Store(false)
var list gList
// 把g放到链表
list.push(forcegc.g)
// 修改g状态放进本地/全局队列,并尝试唤醒m处理
injectglist(&list)
// 解锁
unlock(&forcegc.lock)
}
// schedtrace默认为0,忽略
if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
lasttrace = now
schedtrace(debug.scheddetail > 0)
}
// 解锁
unlock(&sched.sysmonlock)
}
}

cgo相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 创建模板线程,每次唤醒时创建所有locked和cgo类型的m
func startTemplateThread() {
// wasm只有一个线程
if GOARCH == "wasm" {
return
}

mp := acquirem()
// 原子设置haveTemplateThread为1
if !atomic.Cas(&newmHandoff.haveTemplateThread, 0, 1) {
// 设置失败,有其他线程已经执行了
releasem(mp)
return
}
// 清理freem链表,创建并初始化m,locked或cgo类型的m由模板线程延迟创建,其他类型则立即调用平台相关函数创建线程
// templateThread:每次唤醒时创建所有locked和cgo类型的m
newm(templateThread, nil, -1)
releasem(mp)
}

// 每次唤醒时创建所有locked和cgo类型的m
func templateThread() {
// 调度器加锁
lock(&sched.lock)
// sys类型的m数量+=1
sched.nmsys++
// 检查是否存在死锁
checkdead()
// 解锁
unlock(&sched.lock)

// 永久循环
for {
lock(&newmHandoff.lock)

// m队列不为空
for newmHandoff.newm != 0 {
// 拿到整个m链表
newm := newmHandoff.newm.ptr()
// 清除指针
newmHandoff.newm = 0
unlock(&newmHandoff.lock)

// 遍历整个m链表并创建线程运行m
for newm != nil {
// 下一个m,也是新链表的头部
next := newm.schedlink.ptr()
// m清除next指针
newm.schedlink = 0
// 调用平台相关函数创建线程运行m
newm1(newm)
// 指向下一个m
newm = next
}
lock(&newmHandoff.lock)
}

// 当前m等待唤醒,下面会进行sleep
newmHandoff.waiting = true
// 将wake.key重置为0
noteclear(&newmHandoff.wake)

unlock(&newmHandoff.lock)

// 挂起休眠(m放在wake.key),标记blocked为true
notesleep(&newmHandoff.wake)
// 被notewakeup唤醒,标记blocked为false
}
}

profiling相关

1
2
3
4
5
6
7
8
9
10
11
// 内存剖析调用栈初始化
func mProfStackInit(mp *m) {
// 为0不处理,profstackdepth默认128,最大值1024
if debug.profstackdepth == 0 {
return
}
// make([]uintptr, 1+6+debug.profstackdepth)
mp.profStack = makeProfStackFP()
// make([]uintptr, 1+6+debug.profstackdepth)
mp.mLockProfile.stack = makeProfStackFP()
}

其他

sudog相关

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// 从sudogcache获取一个sudog
func acquireSudog() *sudog {
// sema -> acquireSudog -> new(sudog) -> malloc -> GC -> sema
// 有环,需要使用acquirem避免GC
mp := acquirem()
pp := mp.p.ptr()
// sudog列表为空,从调度器拿
if len(pp.sudogcache) == 0 {
// 调度器加锁
lock(&sched.sudoglock)
// 数据量没有超过容量一半 and 调度器的sudog列表不为空
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
// 从调度器的sudog链表拿走放到p里
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
// 解锁
unlock(&sched.sudoglock)
// 调度器的sudog列表也是空的
if len(pp.sudogcache) == 0 {
// 使用new创建一个sudog放到p里
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
// 拿走列表里最后一个sudog
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
//
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp)
return s
}

// 把sudog放回cache
func releaseSudog(s *sudog) {
// guard
if s.elem != nil {
throw("runtime: sudog with non-nil elem")
}
if s.isSelect {
throw("runtime: sudog with non-false isSelect")
}
if s.next != nil {
throw("runtime: sudog with non-nil next")
}
if s.prev != nil {
throw("runtime: sudog with non-nil prev")
}
if s.waitlink != nil {
throw("runtime: sudog with non-nil waitlink")
}
if s.c != nil {
throw("runtime: sudog with non-nil c")
}
gp := getg()
if gp.param != nil {
throw("runtime: releaseSudog with non-nil gp.param")
}

mp := acquirem()
pp := mp.p.ptr()
// 本地队列满了,把一半的数量放到全局队列
if len(pp.sudogcache) == cap(pp.sudogcache) {
// 链表头部和尾部指针
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
// 拿走列表里最后一个sudog
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
// 如果是第一个数据
if first == nil {
first = p
} else {
// 上一个sudog指向当前sudog
last.next = p
}
// 指向当前sudog
last = p
}
// 调度器加锁
lock(&sched.sudoglock)
// 链接全局sudog链表头部
last.next = sched.sudogcache
// 替换(GC时清空全局sudog缓存)
sched.sudogcache = first
// 解锁
unlock(&sched.sudoglock)
}
// 放到本地链表
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}

死锁检测

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
// 检查是否存在死锁
func checkdead() {
// 空函数(staticlockranking默认为false)
assertLockHeld(&sched.lock)

// 动态库(c-archive/c-shared) and 非wasm平台
if (islibrary || isarchive) && GOARCH != "wasm" {
return
}

// panic
if panicking.Load() > 0 {
return
}

var run0 int32
// 没有运行cgo and 有额外的m(Windows)
if !iscgo && cgoHasExtraM && extraMLength.Load() > 0 {
run0 = 1
}

// 用户m数量 = 累计总量 - 累计释放m数量 - 空闲m数量 - locked的m数量 - sys类型m数量
// = sched.mnext - sched.nmfreed - sched.nmidle - sched.nmidlelocked - sched.nmsys
run := mcount() - sched.nmidle - sched.nmidlelocked - sched.nmsys

// 下面条件一般情况下都满足
if run > run0 {
return
}

// run为0或1

// 异常
if run < 0 {
print("runtime: checkdead: nmidle=", sched.nmidle, " nmidlelocked=", sched.nmidlelocked, " mcount=", mcount(), " nmsys=", sched.nmsys, "\n")
unlock(&sched.lock)
throw("checkdead: inconsistent counts")
}

grunning := 0
// 每个g都执行一遍该函数,期间allglock会被锁住
forEachG(func(gp *g) {
// 是否是runtime相关的函数,除了部份如runtime.main
if isSystemGoroutine(gp, false) {
return
}
// 获取g.atomicstatus状态
s := readgstatus(gp)
switch s &^ _Gscan {
case _Gwaiting,
_Gpreempted: // _Gwaiting、_Gpreempted
grunning++
case _Grunnable,
_Grunning,
_Gsyscall: // _Grunnable、_Grunning、_Gsyscall
print("runtime: checkdead: find g ", gp.goid, " in status ", s, "\n")
unlock(&sched.lock)
throw("checkdead: runnable g")
}
})
// 如果runtime·Goexit()
if grunning == 0 {
unlock(&sched.lock)
fatal("no goroutines (main called runtime.Goexit) - deadlock!")
}

// faketime,忽略
if faketime != 0 {
if when := timeSleepUntil(); when < maxWhen {
faketime = when

pp, _ := pidleget(faketime)
if pp == nil {
unlock(&sched.lock)
throw("checkdead: no p for timer")
}
mp := mget()
if mp == nil {
unlock(&sched.lock)
throw("checkdead: no m for timer")
}
sched.nmspinning.Add(1)
mp.spinning = true
mp.nextp.set(pp)
notewakeup(&mp.park)
return
}
}

// 没有goroutine运行,检查p
for _, pp := range allp {
// 有timer
if len(pp.timers.heap) > 0 {
return
}
}

unlock(&sched.lock)
fatal("all goroutines are asleep - deadlock!")
}

参考文档

6.5 调度器
Go 系列文章2:Go 程序的启动流程
How Goroutines Work
Golang Concurrency Patterns: For-Select-Done, Errgroup and Worker Pool
Dmitry Vyukov — Go scheduler: Implementing language with lightweight concurrency
GopherCon 2018: Bryan C. Mills - Rethinking Classical Concurrency Patterns
Illustrated Tales of Go Runtime Scheduler.
Go: Goroutine, OS Thread and CPU Management
GopherCon 2018: Kavya Joshi - The Scheduler Saga
Goroutines: M, P, G orchestration
Golang源码探索(二) 协程的实现原理
Scheduling Internals.
Go 系列文章4 : 调度器