1. 程式人生 > >Golang-gopark函式和goready函式原理分析

Golang-gopark函式和goready函式原理分析

Golang-gopark函式和goready函式原理分析

前面介紹的scheduler和channel裡面都與gopark和goready這兩個函式緊密相關,但是站在上層可以理解這兩個函式的作用,但是出於對原始碼探索,我們要明白這兩個函式不僅僅做了啥,還要知道怎麼做的。本文主要內容是從底層原始碼分析這兩個函式原理:

  1. gopark函式
  2. goready函式

gopark函式

gopark函式在協程的實現上扮演著非常重要的角色,用於協程的切換,協程切換的原因一般有以下幾種情況:

  1. 系統呼叫;
  2. channel讀寫條件不滿足;
  3. 搶佔式排程時間片結束;

下面我們來研究一下gopark函式是怎麼實現協程切換的。

先看看原始碼:

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
	if reason != waitReasonSleep {
		checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
} mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.
waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }

原始碼裡面最重要的一行就是呼叫 mcall(park_m) 函式,mcall在golang需要進行協程切換時被呼叫,做的主要工作是:

  1. 切換當前執行緒的堆疊從g的堆疊切換到g0的堆疊;
  2. 並在g0的堆疊上執行新的函式fn(g);
  3. 儲存當前協程的資訊( PC/SP儲存到g->sched),當後續對當前協程呼叫goready函式時候能夠恢復現場;

mcall函式執行原理

mcall的函式原型是:

func mcall(fn func(*g))

這裡函式fn的引數g指的是在呼叫mcall之前正在執行的協程。

我們前面說到,mcall的主要作用是協程切換,它將當前正在執行的協程狀態儲存起來,然後在m->g0的堆疊上呼叫新的函式。 在新的函式內會將之前執行的協程放棄,然後呼叫一次schedule()來挑選新的協程執行。(也就是在fn函式裡面會呼叫一次schedule()函式進行一次scheduler的重新排程,讓m去執行其餘的goroutine)

mcall函式是通過彙編實現的,在asm_amd64.s裡面有64位機的實現,原始碼如下:

// func mcall(fn func(*g))
// Switch to m->g0's stack, call fn(g).
// Fn must never return. It should gogo(&g->sched)
// to keep running g.
TEXT runtime·mcall(SB), NOSPLIT, $0-8
	//DI中儲存引數fn 
	MOVQ	fn+0(FP), DI
	
	get_tls(CX)
	// 獲取當前正在執行的協程g資訊 
	// 將其狀態儲存在g.sched變數 
	MOVQ	g(CX), AX	// save state in g->sched
	MOVQ	0(SP), BX	// caller's PC
	MOVQ	BX, (g_sched+gobuf_pc)(AX)
	LEAQ	fn+0(FP), BX	// caller's SP
	MOVQ	BX, (g_sched+gobuf_sp)(AX)
	MOVQ	AX, (g_sched+gobuf_g)(AX)
	MOVQ	BP, (g_sched+gobuf_bp)(AX)

	// switch to m->g0 & its stack, call fn
	MOVQ	g(CX), BX
	MOVQ	g_m(BX), BX
	MOVQ	m_g0(BX), SI
	CMPQ	SI, AX	// if g == m->g0 call badmcall
	JNE	3(PC)
	MOVQ	$runtime·badmcall(SB), AX
	JMP	AX
	MOVQ	SI, g(CX)	// g = m->g0
	// 切換到m->g0堆疊 
	MOVQ	(g_sched+gobuf_sp)(SI), SP	// sp = m->g0->sched.sp
	// 引數AX為之前執行的協程g 
	PUSHQ	AX
	MOVQ	DI, DX
	MOVQ	0(DI), DI
	 // 在m->g0堆疊上執行函式fn 
	CALL	DI
	POPQ	AX
	MOVQ	$runtime·badmcall2(SB), AX
	JMP	AX
	RET

上面的彙編程式碼我也不是很懂,但是能夠大致能夠推斷出主要做的事情:

  1. 儲存當前goroutine的狀態(PC/SP)到g->sched中,方便下次排程;
  2. 切換到m->g0的棧;
  3. 然後g0的堆疊上呼叫fn;

回到gopark函式裡面,我們知道mcall會切換到m->g0的棧,然後執行park_m函式,下面看一下park_m函式原始碼:

func park_m(gp *g) {
	// g0
	_g_ := getg()

	if trace.enabled {
		traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
	}
	//執行緒安全更新gp的狀態,置為_Gwaiting
	casgstatus(gp, _Grunning, _Gwaiting)
	// 移除gp與m的繫結關係
	dropg()

	if _g_.m.waitunlockf != nil {
		fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		if !ok {
			if trace.enabled {
				traceGoUnpark(gp, 2)
			}
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}
	// 重新做一次排程
	schedule()
}

park_m函式主要做的幾件事情就是:

  1. 執行緒安全更新goroutine的狀態,置為_Gwaiting 等待狀態;
  2. 解除goroutine與OS thread的繫結關係;
  3. 呼叫schedule()函式,排程器會重新排程選擇一個goroutine去執行;

schedule函式裡面主要呼叫路徑就是:

schedule()>execute()>gogo()

gogo函式的作用正好相反,用來從gobuf中恢復出協程執行狀態並跳轉到上一次指令處繼續執行。因此,其程式碼也相對比較容易理解,當然,其實現也是通過彙編程式碼實現的。

goready函式

goready函式相比gopark函式來說簡單一些,主要功能就是喚醒某一個goroutine,該協程轉換到runnable的狀態。

func goready(gp *g, traceskip int) {
	// 切換到g0的棧
	systemstack(func() {
		ready(gp, traceskip, true)
	})
}

該函式主要就是切換到g0的棧空間然後執行ready函式。

下面我們看看ready函式原始碼(刪除非主流程程式碼):

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
	status := readgstatus(gp)

	// Mark runnable.
	_g_ := getg()//g0
	_g_.m.locks++ // disable preemption because it can be holding p in a local var
	if status&^_Gscan != _Gwaiting {
		dumpgstatus(gp)
		throw("bad g->status in ready")
	}

	//設定gp狀態為runnable,然後加入到P的可執行local queue;
	casgstatus(gp, _Gwaiting, _Grunnable)
	runqput(_g_.m.p.ptr(), gp, next)
	if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
		wakep()
	}
	_g_.m.locks--
	if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in Case we've cleared it in newstack
		_g_.stackguard0 = stackPreempt
	}
}

程式碼的核心流程最主要工作就是將gp(goroutine)的狀態機切換到runnnable,然後加入到P的區域性排程器的local queue,等待P進行排程。

所以這裡有一點需要我們注意到的是,對一個協程呼叫goready函式,這個協程不是可以馬上就執行的,而是要等待排程器的排程執行。