1. 程式人生 > >【我的區塊鏈之路】- golang原始碼分析之協程排程器底層實現( G、M、P)

【我的區塊鏈之路】- golang原始碼分析之協程排程器底層實現( G、M、P)

本人的原始碼是基於go 1.9.7 版本的哦!

緊接著之前寫的 【我的區塊鏈之路】- golang原始碼分析之select的底層實現 和 【我的區塊鏈之路】- golang原始碼分析之channel的底層實現 我們這一次需要對go的排程器做一番剖析。

go的排程器只要實現在 runtime 包中,路徑為: ./src/runtime/proc.go 檔案中。

我們都知道go的強大是因為可以起很多 goroutine 也即是我們所說的協程。那麼協程和執行緒有什麼聯絡呢?協程又是如何排程的呢?

在逼逼這些東西之前,我們先了解下,go語言其實是在作業系統提供的核心執行緒之上搭建了一個特有得 【兩級執行緒

】模型。下面再說兩級執行緒模型前,有三個必知的核心元素。(G、M、P)

G:Goroutine的縮寫,一個G代表了對一段需要被執行的Go語言程式碼的封裝

M:Machine的縮寫,一個M代表了一個核心執行緒

P:Processor的縮寫,一個P代表了M所需的上下文環境

簡單的來說,一個G的執行需要M和P的支援。一個M在與一個P關聯之後形成了一個有效的G執行環境【核心執行緒 + 上下文環境】。每個P都會包含一個可執行的G的佇列 (runq )。

好了下面我們來具體的看看 G、M、P

M (machine):

M是machine的頭文字, 在當前版本的golang中等同於系統執行緒

.
M可以執行兩種程式碼:

  • go程式碼, 即goroutine, M執行go程式碼需要一個P
  • 原生程式碼, 例如阻塞的syscall, M執行原生程式碼不需要P

M會從執行佇列中取出G, 然後執行G, 如果G執行完畢或者進入休眠狀態, 則從執行佇列中取出下一個G執行, 周而復始。
有時候G需要呼叫一些無法避免阻塞的原生程式碼, 這時M會釋放持有的P並進入阻塞狀態, 其他M會取得這個P並繼續執行佇列中的G.
go需要保證有足夠的M可以執行G, 不讓CPU閒著, 也需要保證M的數量不能過多通常建立一個M的原因是由於沒有足夠的M來關聯P並執行其中可執行的G。而且執行時系統執行系統監控的時候,或者GC的時候也會建立M

M的結構體定義:(在 ./src/runtime/runtime2.go 檔案中)

// M 結構體
type m struct {
    /*
        1.  所有呼叫棧的Goroutine,這是一個比較特殊的Goroutine。
        2.  普通的Goroutine棧是在Heap分配的可增長的stack,而g0的stack是M對應的執行緒棧。
        3.  所有排程相關程式碼,會先切換到該Goroutine的棧再執行。
    */
	g0      *g     // goroutine with scheduling stack
	morebuf gobuf  // gobuf arg to morestack
	divmod  uint32 // div/mod denominator for arm - known to liblink

	// Fields not known to debuggers.
	procid        uint64       // for debuggers, but offset not hard-coded
	gsignal       *g           // signal-handling g
	goSigStack    gsignalStack // Go-allocated signal handling stack
	sigmask       sigset       // storage for saved signal mask
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	mstartfn      func()       // 

	curg          *g       //   M 正在執行的結構體G
	caughtsig     guintptr // goroutine running during fatal signal
	p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	id            int32
	mallocing     int32
	throwing      int32
	preemptoff    string // if != "", keep curg running on this m
	locks         int32
	softfloat     int32
	dying         int32
	profilehz     int32
	helpgc        int32
	spinning      bool // m is out of work and is actively looking for work
	blocked       bool // m is blocked on a note
	inwb          bool // m is executing a write barrier
	newSigstack   bool // minit on C thread called sigaltstack
	printlock     int8
	incgo         bool // m is executing a cgo call
	fastrand      uint32
	ncgocall      uint64      // number of cgo calls in total
	ncgo          int32       // number of cgo calls currently in progress
	cgoCallersUse uint32      // if non-zero, cgoCallers in use temporarily
	cgoCallers    *cgoCallers // cgo traceback if crashing in cgo call
	park          note
	alllink       *m // on allm
	schedlink     muintptr
	mcache        *mcache
	lockedg       *g          // 表示與當前M鎖定那個g
	createstack   [32]uintptr // stack that created this thread.
	freglo        [16]uint32  // d[i] lsb and f[i]
	freghi        [16]uint32  // d[i] msb and f[i+16]
	fflag         uint32      // floating point compare flags
	locked        uint32      // tracking for lockosthread
	nextwaitm     uintptr     // next m waiting for lock
	needextram    bool
	traceback     uint8
	waitunlockf   unsafe.Pointer // todo go func(*g, unsafe.pointer) bool
	waitlock      unsafe.Pointer
	waittraceev   byte
	waittraceskip int
	startingtrace bool
	syscalltick   uint32
	thread        uintptr // thread handle

	// these are here because they are too large to be on the stack
	// of low-level NOSPLIT functions.
	libcall   libcall
	libcallpc uintptr // for cpu profiler
	libcallsp uintptr
	libcallg  guintptr
	syscall   libcall // stores syscall parameters on windows

	mOS
}

M的欄位眾多,其中最重要的為下面四個:

g0: Go執行時系統在啟動之初建立的,用於執行一些執行時任務。

mstartfn:表示M的起始函式。其實就是我們 go 語句攜帶的那個函式啦。

curg:存放當前正在執行的G的指標。

p:指向當前與M關聯的那個P。

nextp:用於暫存於當前M有潛在關聯的P。 (預聯)當M重新啟動時,即用預聯的這個P做關聯啦

spinning:表示當前M是否正在尋找G。在尋找過程中M處於自旋狀態

lockedg:表示與當前M鎖定的那個G。執行時系統會把 一個M 和一個G鎖定,一旦鎖定就只能雙方相互作用,不接受第三者

M並沒有像G和P一樣的狀態標記, 但可以認為一個M有以下的狀態:

  • 自旋中(spinning): M正在從執行佇列獲取G, 這時候M會擁有一個P
  • 執行go程式碼中: M正在執行go程式碼, 這時候M會擁有一個P
  • 執行原生程式碼中: M正在執行原生程式碼或者阻塞的syscall, 這時M並不擁有P
  • 休眠中: M發現無待執行的G時會進入休眠, 並新增到空閒M連結串列中, 這時M並不擁有P

自旋中(spinning)這個狀態非常重要, 是否需要喚醒或者建立新的M取決於當前自旋中的M的數量。

M在被建立之初會被加入到全域性的M列表 【runtime.allm】 。接著,M的起始函式(mstartfn)和準備關聯的P(p)都會被設定。最後,執行時系統會為M專門建立一個新的核心執行緒並與之關聯。這時候這個新的M就為執行G做好了準備。其中起始函式(mstartfn)僅當執行時系統要用此M執行系統監控或者垃圾回收等任務的時候才會被設定。全域性M列表的作用是執行時系統在需要的時候會通過它獲取到所有的M的資訊,同時防止M被gc

在新的M被建立後回西安做一番初始化工作。其中包括了對自身所持的棧空間以及訊號做處理的初始化。在上述初始化完成後 mstartfn 函式就會被執行 (如果存在的話)。【注意】:如果mstartfn 代表的是系統監控任務的話,那麼該M會一直在執行mstartfn 而不會有後續的流程。否則 mstartfn 執行完後,當前M將會與那個準備與之關聯的P完成關聯。至此,一個併發執行環境才真正完成。之後就是M開始尋找可執行的G並執行之。

執行時系統管轄的M會在GC任務執行的時候被停止,這時候系統會對M的屬性做某些必要的重置並把M放置入排程器的空閒M列表。【很重要】因為在需要一個未被使用的M時,執行時系統會先去這個空閒列表獲取M。(只有都沒有的時候才會建立M)

M本身是無狀態的。M是否有空閒僅以它是否存在於排程器的空閒M列表 runtime.sched.midle  中為依據 (空閒列表不是那個全域性列表哦)。

單個Go程式所使用的M的最大數量是可以被設定的。在我們使用命令執行Go程式時候,有一個載入程式先會被啟動的。在這個歌載入程式中會為Go程式的執行簡歷必要的環境。載入程式對M的數量進行初始化設定,預設是 最大值 1W 【即是說,一個Go程式最多可以使用1W個M,即:理想狀態下,可以同時有1W個核心執行緒被同時執行】。使用 runtime/debug.SetMaxThreads() 函式設定。

P (process):

P是process的頭文字, 代表M執行G所需要的資源
一些講解協程的文章把P理解為cpu核心, 其實這是錯誤的.
雖然P的數量預設等於cpu核心數, 但可以通過環境變數GOMAXPROC修改, 在實際執行時P跟cpu核心並無任何關聯。

P也可以理解為控制go程式碼的並行度的機制,
如果P的數量等於1, 代表當前最多隻能有一個執行緒(M)執行go程式碼,
如果P的數量等於2, 代表當前最多隻能有兩個執行緒(M)執行go程式碼.
執行原生程式碼的執行緒數量不受P控制

因為同一時間只有一個執行緒(M)可以擁有PP中的資料都是鎖自由(lock free)的, 讀寫這些資料的效率會非常的高

P是使G能夠在M中執行的關鍵。Go執行時系統適當地讓P與不同的M建立或者斷開聯絡,以使得P中的那些可執行的G能夠在需要的時候及時獲得執行時機。

P的結構體定義:(在 ./src/runtime/runtime2.go 檔案中)

type p struct {
	lock mutex

	id          int32
	status      uint32 // one of pidle/prunning/...
	link        puintptr
	schedtick   uint32     // incremented on every scheduler call
	syscalltick uint32     // incremented on every system call
	sysmontick  sysmontick // last tick observed by sysmon
	m           muintptr   // back-link to associated m (nil if idle)
	mcache      *mcache
	racectx     uintptr

	deferpool    [5][]*_defer // pool of available defer structs of different sizes (see panic.go)
	deferpoolbuf [5][32]*_defer

	// Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
	goidcache    uint64
	goidcacheend uint64

	// Queue of runnable goroutines. Accessed without lock.
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// runnext, if non-nil, is a runnable G that was ready'd by
	// the current G and should be run next instead of what's in
	// runq if there's time remaining in the running G's time
	// slice. It will inherit the time left in the current time
	// slice. If a set of goroutines is locked in a
	// communicate-and-wait pattern, this schedules that set as a
	// unit and eliminates the (potentially large) scheduling
	// latency that otherwise arises from adding the ready'd
	// goroutines to the end of the run queue.
	runnext guintptr

	// Available G's (status == Gdead)
	gfree    *g
	gfreecnt int32

	sudogcache []*sudog
	sudogbuf   [128]*sudog

	tracebuf traceBufPtr

	// traceSweep indicates the sweep events should be traced.
	// This is used to defer the sweep start event until a span
	// has actually been swept.
	traceSweep bool
	// traceSwept and traceReclaimed track the number of bytes
	// swept and reclaimed by sweeping in the current sweep loop.
	traceSwept, traceReclaimed uintptr

	palloc persistentAlloc // per-P to avoid mutex

	// Per-P GC state
	gcAssistTime     int64 // Nanoseconds in assistAlloc
	gcBgMarkWorker   guintptr
	gcMarkWorkerMode gcMarkWorkerMode

	// gcw is this P's GC work buffer cache. The work buffer is
	// filled by write barriers, drained by mutator assists, and
	// disposed on certain GC state transitions.
	gcw gcWork

	runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point

	pad [sys.CacheLineSize]byte
}

通過runtime.GOMAXPROCS函式我們可以改變單個Go程式可以間擁有的P的最大數量。

P的最大數量相當於是對可以被併發執行的使用者級的G的數量作出限制。

每一個P都必須關聯一個M才能使其中的G得以執行

【注意】:執行時系統會將M與關聯的P分離開來。但是如果該P的可執行佇列中還有未執行的G,那麼執行時系統就會找到一個空的M (在排程器的空閒佇列中的M) 或者建立一個空的M,並與該P關聯起來(為了執行G而做準備)。

runtime.GOMAXPROCS函式設定的只會影響P的數量,但是對M (核心執行緒)的數量不會影響,所以runtime.GOMAXPROCS 並不是控制執行緒數,只能說是影響上下文環境P的數目

在Go程式開始執行時,會先由載入程式對M做了數量上的限制,及對P做了限制,P的數量預設為1。所以我們無論在程式中使用go關鍵字啟用多少goroutine,它們都會被塞到一個P的可執行G佇列中

在確認P的最大數量後,執行時系統會根據這個數值初始化全域性的P列表 【runtime.allp】,類似全域性M列表,其中包含了所有 執行時系統建立的所有P。隨後,執行時系統會把排程器的可執行G佇列【runtime.sched.runq】中的所有G均勻的放入全域性的P列表中的各個P的可執行G隊列當中。到這裡為止,執行時系統需要用到的所有P都準備就緒了。

類似M的空閒列表,排程器也存在一個P的空閒列表【runtime.sched.pidle】,當一個P不再與任何M關聯的時候,執行時系統就會把該P放入這個列表中,而一個空閒的P關聯了某個M之後會被從這個列表中取出【注意:就算一個P加入了空閒佇列,但是它的可執行G佇列不一定為空

和M不同P是有狀態的:(五種)

Pidle:當前P未和任何M關聯

Prunning:當前P正在和某個M關聯

Psyscall:當前P中的被執行的那個G正在進行系統呼叫

Pgcstop:執行時系統正在進行gc。(執行時系統在gc時會試圖把全域性P列表中的P都處於此狀態)

Pdead:當前P已經不再被使用。(在呼叫runtime.GOMAXPROCS減少P的數量時,多餘的P就處於此狀態)

P的初始狀態就是為Pgcstop,處於這個狀態很短暫,在初始化和填充P中的G佇列之後,執行時系統會將其狀態置為Pidle並放入排程器的空閒P列表 (runtime.sched.pidle)中。其中的P會由排程器根據實際情況進行取用。下圖是P在各個狀態建的流轉情況:

從上圖,我們可以看出,除了Pdead之外的其他狀態的P都會在執行時系統欲進行GC是被指為Pgcstop在gc結束後狀態不會回覆到之前的狀態的,而是都統一直接轉到了Pidle 【這意味著,他們都需要被重新排程】。【注意】:除了Pgcstop 狀態的P,其他狀態的P都會在 呼叫runtime.GOMAXPROCS 函式去減少P數目時,被認為是多餘的P而狀態轉為Pdead,這時候其帶的可執行G的佇列中的G都會被轉移到 排程器的可執行G佇列中,它的自由G佇列 【gfree】也是一樣被移到排程器的自由列表 【runtime.sched.gfree】中

【注意】:每個P中都有一個可執行G佇列自由G佇列。自由G佇列包含了很多已經完成的G,隨著被執行完成的G的積攢到一定程度後,執行時系統會把其中的部分G轉移的排程器的自由G佇列 【runtime.sched.gfree】中。

【注意】:當我們每次用 go關鍵字 啟用一個G的時候,執行時系統都會先從P的自由G佇列獲取一個G來封裝我們提供的函式 (go 關鍵字後面的函式) ,如果發現P中的自由G過少時,會從排程器的自由G佇列中移一些G過來,只有連排程器的自由G列表都彈盡糧絕的時候,才會去建立新的G。

G (goroutine):

G是goroutine的頭文字, goroutine可以解釋為受管理的輕量執行緒, goroutine使用go關鍵詞建立。

舉例來說,  func main() { go other() },  這段程式碼建立了兩個goroutine。
一個是main, 另一個是other, 【注意】:main本身也是一個goroutine

goroutine的新建, 休眠, 恢復, 停止都受到go執行時的管理。
goroutine執行非同步操作時會進入休眠狀態, 待操作完成後再恢復, 無需佔用系統執行緒
goroutine新建或恢復時會新增到執行佇列, 等待M取出並執行

G的結構體定義:(在 ./src/runtime/runtime2.go 檔案中)


type g struct {
	// Stack parameters.
	// stack describes the actual stack memory: [stack.lo, stack.hi).
	// stackguard0 is the stack pointer compared in the Go stack growth prologue.
	// It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
	// stackguard1 is the stack pointer compared in the C stack growth prologue.
	// It is stack.lo+StackGuard on g0 and gsignal stacks.
	// It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
	stack       stack   // offset known to runtime/cgo   描述了真實的棧記憶體,包括上下界
	stackguard0 uintptr // offset known to liblink
	stackguard1 uintptr // offset known to liblink

	_panic         *_panic // innermost panic - offset known to liblink
	_defer         *_defer // innermost defer
	m              *m      // current m; offset known to arm liblink   當前執行G的M
	sched          gobuf    //  goroutine切換時,用於儲存g的上下文
	syscallsp      uintptr        // if status==Gsyscall, syscallsp = sched.sp to use during gc
	syscallpc      uintptr        // if status==Gsyscall, syscallpc = sched.pc to use during gc
	stktopsp       uintptr        // expected sp at top of stack, to check in traceback
	param          unsafe.Pointer // passed parameter on wakeup   用於傳遞引數,睡眠時其他goroutine可以設定param,喚醒時該goroutine可以獲取
	atomicstatus   uint32
	stackLock      uint32 // sigprof/scang lock; TODO: fold in to atomicstatus
	goid           int64    // goroutine的ID
	waitsince      int64  // approx time when the g become blocked   g被阻塞的大體時間
	waitreason     string // if status==Gwaiting
	schedlink      guintptr
	preempt        bool     // preemption signal, duplicates stackguard0 = stackpreempt
	paniconfault   bool     // panic (instead of crash) on unexpected fault address
	preemptscan    bool     // preempted g does scan for gc
	gcscandone     bool     // g has scanned stack; protected by _Gscan bit in status
	gcscanvalid    bool     // false at start of gc cycle, true if G has not run since last scan; TODO: remove?
	throwsplit     bool     // must not split stack
	raceignore     int8     // ignore race detection events
	sysblocktraced bool     // StartTrace has emitted EvGoInSyscall about this goroutine
	sysexitticks   int64    // cputicks when syscall has returned (for tracing)
	traceseq       uint64   // trace event sequencer
	tracelastp     puintptr // last P emitted an event for this goroutine
	lockedm        *m       // G被鎖定只在這個m上執行
	sig            uint32
	writebuf       []byte
	sigcode0       uintptr
	sigcode1       uintptr
	sigpc          uintptr
	gopc           uintptr // pc of go statement that created this goroutine
	startpc        uintptr // pc of goroutine function
	racectx        uintptr
	waiting        *sudog         // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
	cgoCtxt        []uintptr      // cgo traceback context
	labels         unsafe.Pointer // profiler labels
	timer          *timer         // cached timer for time.Sleep

	// Per-G GC state

	// gcAssistBytes is this G's GC assist credit in terms of
	// bytes allocated. If this is positive, then the G has credit
	// to allocate gcAssistBytes bytes without assisting. If this
	// is negative, then the G must correct this by performing
	// scan work. We track this in bytes to make it fast to update
	// and check for debt in the malloc hot path. The assist ratio
	// determines how this corresponds to scan work debt.
	gcAssistBytes int64
}


// 用於儲存G切換時上下文的快取結構體
type gobuf struct {
	// The offsets of sp, pc, and g are known to (hard-coded in) libmach.
	//
	// ctxt is unusual with respect to GC: it may be a
	// heap-allocated funcval so write require a write barrier,
	// but gobuf needs to be cleared from assembly. We take
	// advantage of the fact that the only path that uses a
	// non-nil ctxt is morestack. As a result, gogo is the only
	// place where it may not already be nil, so gogo uses an
	// explicit write barrier. Everywhere else that resets the
	// gobuf asserts that ctxt is already nil.
	sp   uintptr     // 當前的棧指標
	pc   uintptr     // 計數器
	g    guintptr    // g自身
	ctxt unsafe.Pointer // this has to be a pointer so that gc scans it
	ret  sys.Uintreg
	lr   uintptr
	bp   uintptr // for GOEXPERIMENT=framepointer
}

下面我們來講講G。Go語言的編譯器會把我們編寫的go語句程式設計一個執行時系統的函式呼叫,並把go語句中那個函式及其引數都作為引數傳遞給這個執行時系統函式中。

執行時系統在接到這樣一個呼叫後,會先檢查一下go函式及其引數的合法性,緊接著會試圖從本地P的自由G佇列中(或者排程器的自由G佇列)中獲取一個可用的自由G (P中有講述了),如果沒有則新建立一個G。類似M和P,G在執行時系統中也有全域性的G列表【runtime.allg】,那些新建的G會先放到這個全域性的G列表中,其列表的作用也是集中放置了當前執行時系統中給所有的G的指標。在用自由G封裝go的函式時,執行時系統都會對這個G做一次初始化。

初始化:包含了被關聯的go關鍵字後的函式及當前G的狀態機G的ID等等。在G被初始化完成後就會被放置到當前本地的P的可執行佇列中。只要時機成熟,排程器會立即盡心這個G的排程執行。

G的各種狀態:

Gidle:G被建立但還未完全被初始化。

Grunnable:當前G為可執行的,正在等待被執行。

Grunning:當前G正在被執行。

Gsyscall:當前G正在被系統呼叫

Gwaiting:當前G正在因某個原因而等待

Gdead:當前G完成了執行

正在被初始化進行中的G是處於Grunnable狀態的。一個G真正被使用是在狀態為Grunnable之後。G的生命週期及狀態變化如圖:

圖上有一步是事件到來,那麼G在執行過程中,是否等待某個事件以及等待什麼樣的事件完全由起封裝的go關鍵字後的函式決定。(如:等待chan中的值、涉及網路I/O、time.Timer、time.Sleep等等事件)

G退出系統呼叫,及其複雜:執行時系統先會嘗試直接運行當前G,僅當無法被執行時才會轉成Grunnable並放置入排程器的自由G列表中。

最後,已經是Gdead狀態的G是可以被重新初始化並使用的。而對比進入Pdead狀態的P等待的命運只有被銷燬。處於Gdead的G會被放置到本地P或者排程器的自由G列表中。

至此,G、M、P的初步描述已經完畢,下面我們來看一看一些核心的佇列:

G、M、P的容器
中文名 原始碼的名稱 作用域 簡要說明
全域性M列表 runtime.allm 執行時系統 存放所有M
全域性P列表 runtime.allp 執行時系統 存放所有P
全域性G列表 runtime.allg 執行時系統 存放所有G
排程器中的空閒M列表 runtime.sched.midle 排程器 存放空閒M
排程器中的空閒P列表 runtime.sched.pidle 排程器 存放空閒P
排程器中的可執行G佇列 runtime.sched.runq 排程器 存放可執行G
排程器中那個的自由G列表 runtime.sched.gfree 排程器 存放自由G
P的可執行G佇列 runq 本地P 存放當前P中的可執行G
P中的自由G列表 gfree 本地P 存放當前P中的自由G

 三個全域性的列表主要為了統計執行時系統的的所有G、M、P。我們主要關心剩下的這些容器,尤其是和G相關的四個

在執行時系統建立的G都會被儲存在全域性的G列表中,值得注意的是:從Gsyscall轉出來的G都會被放置到排程器的可執行G佇列中。而被執行時系統初始化的G會被放置到本地P的可執行列表中。從Gwaiting轉出來的G,除了因網路I/O陷入等待的G之外,都會被放置到本地P的可執行G佇列中。轉成Gdead狀態的G會先被放置到本地P的自由G列表 (上面的描述可以知道這一點)。排程器中的與G、M、P相關的列表其實只是起了一個暫存的作用。

一句話概括三者關係:

  • G需要繫結在M上才能執行;
  • M需要繫結P才能執行;

下面我們看一看三者及核心排程實體【KSE】的關係:

 

綜上所述,一個G的執行需要M和P的支援。一個M在於一個P關聯之後就形成一個有效的G執行環境 【核心執行緒 +  上下文環境】。每個P都含有一個 可執行G的佇列【runq】。佇列中的G會被一次傳遞給本地P關聯的M並且獲得執行時機

由上圖可以看出 M 與 KSE 總是 一對一 的。一個M能且僅能代表一個核心執行緒

一個M的生命週期內,它會且僅會與一個KSE產生關聯M與P以及P與G之間的關聯是多變的總是會隨著實際排程的過程而改變。其中, M 與 P 總是一對一,P 與 G 總是 一對多, 而 一個 G 最終由 一個 M 來負責執行

上述我們講的執行時系統其實就是我們下面要說的排程器

我們再來回顧下G、M、P 中的主要成員:

G裡面比較重要的成員:

  • stack: 當前g使用的棧空間, 有lo和hi兩個成員
  • stackguard0: 檢查棧空間是否足夠的值, 低於這個值會擴張棧, 0是go程式碼使用的
  • stackguard1: 檢查棧空間是否足夠的值, 低於這個值會擴張棧, 1是原生程式碼使用的
  • m: 當前g對應的m
  • sched: g的排程資料, 當g中斷時會儲存當前的pc和rsp等值到這裡, 恢復執行時會使用這裡的值
  • atomicstatus: g的當前狀態
  • schedlink: 下一個g, 當g在連結串列結構中會使用
  • preempt: g是否被搶佔中
  • lockedm: g是否要求要回到這個M執行, 有的時候g中斷了恢復會要求使用原來的M執行

M裡面比較重要的成員:

  • g0: 用於排程的特殊g, 排程和執行系統呼叫時會切換到這個g
  • curg: 當前執行的g
  • p: 當前擁有的P
  • nextp: 喚醒M時, M會擁有這個P
  • park: M休眠時使用的訊號量, 喚醒M時會通過它喚醒
  • schedlink: 下一個m, 當m在連結串列結構中會使用
  • mcache: 分配記憶體時使用的本地分配器, 和p.mcache一樣(擁有P時會複製過來)
  • lockedg: lockedm的對應值

P裡面比較重要的成員:

  • status: p的當前狀態
  • link: 下一個p, 當p在連結串列結構中會使用
  • m: 擁有這個P的M
  • mcache: 分配記憶體時使用的本地分配器
  • runqhead: 本地執行佇列的出隊序號
  • runqtail: 本地執行佇列的入隊序號
  • runq: 本地執行佇列的陣列, 可以儲存256個G
  • gfree: G的自由列表, 儲存變為_Gdead後可以複用的G例項
  • gcBgMarkWorker: 後臺GC的worker函式, 如果它存在M會優先執行它
  • gcw: GC的本地工作佇列, 詳細將在下一篇(GC篇)分析

排程器涉及到的結構體除了上面的G、M、P 之外,還有以下,比如全域性的排程器:

type schedt struct {
	// accessed atomically. keep at top to ensure alignment on 32-bit systems.
     // 下面兩個變數需以原子訪問訪問。保持在 struct 頂部,確保其在 32 位系統上可以對齊
	goidgen  uint64
	lastpoll uint64

	lock mutex
    
    // 當修改 nmidle,nmidlelocked,nmsys,nmfreed 這些數值時
    // 需要記得呼叫 checkdead

	midle        muintptr // idle m's waiting for work   空閒的M 佇列。
	nmidle       int32    // number of idle m's waiting for work  當前等待工作的空閒 m 計數
	nmidlelocked int32    // number of locked m's waiting for work  當前等待工作的被 lock 的 m 計數
	mcount       int32    // number of m's that have been created  已經建立的 m 數量
	maxmcount    int32    // maximum number of m's allowed (or die)   允許建立的最大的 m 數量

	ngsys uint32 // number of system goroutines; updated atomically  系統 goroutine 的數量, 原子操作

	pidle      puintptr // idle p's   空閒的 p 佇列
	npidle     uint32
	nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

	// Global runnable queue.
     // 全域性的可執行 g 佇列
	runqhead guintptr       // 隊頭地址
	runqtail guintptr       // 隊尾地址 
	runqsize int32          // 佇列寬度  

	// Global cache of dead G's.
    // dead G 的全域性緩
	gflock       mutex
	gfreeStack   *g        // 棧中自由g ?
	gfreeNoStack *g        // 堆中自由g ?   
	ngfree       int32

	// Central cache of sudog structs.
    // sudog 結構的集中快取
	sudoglock  mutex
	sudogcache *sudog

	// Central pool of available defer structs of different sizes.
    // 不同大小的可用的 defer struct 的集中快取池
	deferlock mutex
	deferpool [5]*_defer

	gcwaiting  uint32 // gc is waiting to run  gc 等待執行狀態。 作為gc任務被執行期間的輔助標記、停止計數和通知機制
	stopwait   int32
	stopnote   note
	sysmonwait uint32  // 作為 系統檢測任務被執行期間的停止計數和通知機制
	sysmonnote note

	// safepointFn should be called on each P at the next GC
	// safepoint if p.runSafePointFn is set.
    // 應在下一個GC上的每個P上呼叫safepointFn
    // 如果設定了p.runSafePointFn,則為safepoint。
	safePointFn   func(*p)
	safePointWait int32
	safePointNote note

	profilehz int32 // cpu profiling rate   CPU分析率

	procresizetime int64 // nanotime() of last change to gomaxprocs   上次修改 gomaxprocs 的納秒時間
	totaltime      int64 // ∫gomaxprocs dt up to procresizetime
}

全域性排程器,全域性只有一個schedt型別的例項

sudoG 結構體:

// sudog 代表在等待列表裡的 g,比如向 channel 傳送/接收內容時
// 之所以需要 sudog 是因為 g 和同步物件之間的關係是多對多的
// 一個 g 可能會在多個等待佇列中,所以一個 g 可能被打包為多個 sudog
// 多個 g 也可以等待在同一個同步物件上
// 因此對於一個同步物件就會有很多 sudog 了
// sudog 是從一個特殊的池中進行分配的。用 acquireSudog 和 releaseSudog 來分配和釋放 sudog

type sudog struct {
	// The following fields are protected by the hchan.lock of the
	// channel this sudog is blocking on. shrinkstack depends on
	// this for sudogs involved in channel ops.

	g          *g
	selectdone *uint32 // CAS to 1 to win select race (may point to stack)
	next       *sudog
	prev       *sudog
	elem       unsafe.Pointer // data element (may point to stack)

	// The following fields are never accessed concurrently.
	// For channels, waitlink is only accessed by g.
	// For semaphores, all fields (including the ones above)
	// are only accessed when holding a semaRoot lock.

	acquiretime int64
	releasetime int64
	ticket      uint32
	parent      *sudog // semaRoot binary tree
	waitlink    *sudog // g.waiting list or semaRoot
	waittail    *sudog // semaRoot
	c           *hchan // channel
}

 

那麼goroutine的入口是怎麼樣的呢?首先,我們從goroutine是如何被建立的說起,建立goroutine的函式為:newproc 函式 (在 ./src/runtime/proc.go 檔案中),即:使用go命令建立goroutine時, go會把go命令編譯為對runtime.newproc的呼叫

// Create a new g running fn with siz bytes of arguments.
// Put it on the queue of g's waiting to run.
// The compiler turns a go statement into a call to this.
// Cannot split the stack because it assumes that the arguments
// are available sequentially after &fn; they would not be
// copied if a stack split occurred.

// 根據 引數 fn 和 siz 建立一個 g
// 並把它放置入 自由g佇列中等待喚醒
// 編譯器翻譯一個 go 表示式時會呼叫這個函式
// 無法拆分堆疊,因為它假設引數在 &fn 之後順序可用; 如果發生堆疊拆分,則不會複製它們。

//    新建一個goroutine,
//    用fn + PtrSize 獲取第一個引數的地址,也就是argp
//    用siz - 8 獲取pc地址

//go:nosplit
func newproc(siz int32, fn *funcval) {
    // add 是一個指標運算,跳過函式指標
    // 把棧上的引數起始地址找到
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)

    // getcallerpc返回的是 呼叫函式之後的那條程式指令的地址,
    // 即callee函式返回時要執行的下一條指令的地址
	pc := getcallerpc(unsafe.Pointer(&siz))
    
    // 用g0的棧建立G物件
	systemstack(func() {
		newproc1(fn, (*uint8)(argp), siz, 0, pc)
	})
}


// 結構體 funcval
// funcval 是一個變長結構,第一個成員是函式指標
// 所以上面的 add 是跳過這個 fn
type funcval struct {
	fn uintptr
	// variable-size, fn-specific data here   這裡的可變大小,特定於fn的資料
}

runtime.newproc函式中只做了三件事:

  • 計算額外引數的地址 argp
  • 獲取呼叫端的地址(返回地址) pc
  • 使用systemstack呼叫 newproc1 函式

systemstack 會切換當前的 g g0, 並且使用g0的棧空間, 然後呼叫傳入的函式, 再切換回原來的g和原來的棧空間
切換到g0後會假裝返回地址是mstart, 這樣traceback的時候可以在mstart停止。
這裡傳給systemstack的是一個閉包, 呼叫時會把閉包的地址放到暫存器rdx, 具體可以參考上面對閉包的分析。

下面我們在主要來看看  newproc1 函式做了什麼:

// Create a new g running fn with narg bytes of arguments starting
// at argp and returning nret bytes of results.  callerpc is the
// address of the go statement that created this. The new g is put
// on the queue of g's waiting to run.

// 根據函式引數和函式地址,建立一個新的G,然後將這個G加入佇列等待執行
func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr) *g {

    // 先獲取 當前 g,其實這裡獲取到的是 g0
	_g_ := getg()
    
    // 判斷下 func 的實現是否為空
	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
    
    // 設定g對應的m的locks++, 禁止搶佔
	_g_.m.locks++ // disable preemption because it can be holding p in a local var   禁用搶佔,因為它可以在本地var中儲存p
	siz := narg + nret
	siz = (siz + 7) &^ 7

	// We could allocate a larger initial stack if necessary.
	// Not worth it: this is almost always an error.
	// 4*sizeof(uintreg): extra space added below
	// sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
	if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
		throw("newproc: function arguments too large for new goroutine")
	}

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	if newg.stack.hi == 0 {
		throw("newproc1: newg missing stack")
	}

	if readgstatus(newg) != _Gdead {
		throw("newproc1: new g is not Gdead")
	}

	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	if usesLR {
		// caller's LR
		*(*uintptr)(unsafe.Pointer(sp)) = 0
		prepGoExitFrame(sp)
		spArg += sys.MinFrameSize
	}
	if narg > 0 {
		memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
		// This is a stack-to-stack copy. If write barriers
		// are enabled and the source stack is grey (the
		// destination is always black), then perform a
		// barrier copy. We do this *after* the memmove
		// because the destination stack may have garbage on
		// it.
		if writeBarrier.needed && !_g_.m.curg.gcscandone {
			f := findfunc(fn.fn)
			stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps))
			// We're in the prologue, so it's always stack map index 0.
			bv := stackmapdata(stkmap, 0)
			bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata)
		}
	}

	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	newg.gcscanvalid = false
	casgstatus(newg, _Gdead, _Grunnable)

	if _p_.goidcache == _p_.goidcacheend {
		// Sched.goidgen is the last allocated id,
		// this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
		// At startup sched.goidgen=0, so main goroutine receives goid=1.
		_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
		_p_.goidcache -= _GoidCacheBatch - 1
		_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
	}
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	if raceenabled {
		newg.racectx = racegostart(callerpc)
	}
	if trace.enabled {
		traceGoCreate(newg, newg.startpc)
	}
	runqput(_p_, newg, true)

	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
	return newg
}

先大致看下newproc1 函式邏輯流程:

newproc1 --> newg
newg[gfget] --> nil{is nil?}
nil -->|yes|E[init stack]
nil -->|no|C[malg]
C --> D[set g status=> idle->dead]
D --> allgadd
E --> G[set g status=> dead-> runnable]
allgadd --> G
G --> runqput

runtime.newproc1的處理如下:

  • 呼叫getg獲取當前的g, 會編譯為讀取FS暫存器(TLS), 這裡會獲取到g0
  • 設定g對應的m的locks++, 禁止搶佔
  • 獲取m擁有的p
  • 新建一個g
    • 首先呼叫gfget從p.gfree獲取g, 如果之前有g被回收在這裡就可以複用
    • 獲取不到時呼叫malg分配一個g, 初始的棧空間大小是2K
    • 需要先設定g的狀態為已中止(_Gdead), 這樣gc不會去掃描這個g的未初始化的棧
  • 把引數複製到g的棧上
  • 把返回地址複製到g的棧上, 這裡的返回地址是goexit, 表示呼叫完目標函式後會呼叫goexit
  • 設定g的排程資料(sched)
    • 設定sched.sp等於引數+返回地址後的rsp地址
    • 設定sched.pc等於目標函式的地址, 檢視gostartcallfngostartcall
    • 設定sched.g等於g
  • 設定g的狀態為待執行(_Grunnable)
  • 呼叫runqput把g放到執行佇列
    • 首先隨機把g放到p.runnext, 如果放到runnext則入隊原來在runnext的g
    • 然後嘗試把g放到P的"本地執行佇列"
    • 如果本地執行佇列滿了則呼叫runqputslow把g放到"全域性執行佇列"
      • runqputslow會把本地執行佇列中一半的g放到全域性執行佇列, 這樣下次就可以繼續用快速的本地執行隊列了
  • 如果當前有空閒的P, 但是無自旋的M(nmspinning等於0), 並且主函式已執行則喚醒或新建一個M
    • 這一步非常重要, 用於保證當前有足夠的M執行G, 具體請檢視上面的"空閒M連結串列"
    • 喚醒或新建一個M會通過wakep函式
      • 首先交換nmspinning到1, 成功再繼續, 多個執行緒同時執行wakep只有一個會繼續
      • 呼叫startm函式
        • 呼叫pidleget從"空閒P連結串列"獲取一個空閒的P
        • 呼叫mget從"空閒M連結串列"獲取一個空閒的M
        • 如果沒有空閒的M, 則呼叫newm新建一個M
          • newm會新建一個m的例項, m的例項包含一個g0, 然後呼叫newosproc動一個系統執行緒
          • newosproc會呼叫syscall clone建立一個新的執行緒
          • 執行緒建立後會設定TLS, 設定TLS中當前的g為g0, 然後執行mstart
        • 呼叫notewakeup(&mp.park)喚醒執行緒

建立goroutine的流程就這麼多了, 接下來看看M是如何排程的.

 

 

(未完,疲憊中.............)