本篇文章來分析一下 Go 語言 HTTP 標準庫是如何實現的。

轉載請宣告出處哦~,本篇文章釋出於luozhiyun的部落格:https://www.luozhiyun.com/archives/561

本文使用的go的原始碼1.15.7

基於HTTP構建的服務標準模型包括兩個端,客戶端(Client)和服務端(Server)。HTTP 請求從客戶端發出,服務端接受到請求後進行處理然後將響應返回給客戶端。所以http伺服器的工作就在於如何接受來自客戶端的請求,並向客戶端返回響應。

一個典型的 HTTP 服務應該如圖所示:

HTTP client

在 Go 中可以直接通過 HTTP 包的 Get 方法來發起相關請求資料,一個簡單例子:

func main() {
resp, err := http.Get("http://httpbin.org/get?name=luozhiyun&age=27")
if err != nil {
fmt.Println(err)
return
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
fmt.Println(string(body))
}

我們下面通過這個例子來進行分析。

HTTP 的 Get 方法會呼叫到 DefaultClient 的 Get 方法,DefaultClient 是 Client 的一個空例項,所以最後會呼叫到 Client 的 Get 方法:

Client 結構體

type Client struct {
Transport RoundTripper
CheckRedirect func(req *Request, via []*Request) error
Jar CookieJar
Timeout time.Duration
}

Client 結構體總共由四個欄位組成:

Transport:表示 HTTP 事務,用於處理客戶端的請求連線並等待服務端的響應;

CheckRedirect:用於指定處理重定向的策略;

Jar:用於管理和儲存請求中的 cookie;

Timeout:指定客戶端請求的最大超時時間,該超時時間包括連線、任何的重定向以及讀取相應的時間;

初始化請求

func (c *Client) Get(url string) (resp *Response, err error) {
// 根據方法名、URL 和請求體構建請求
req, err := NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
// 執行請求
return c.Do(req)
}

我們要發起一個請求首先需要根據請求型別構建一個完整的請求頭、請求體、請求引數。然後才是根據請求的完整結構來執行請求。

NewRequest 初始化請求

NewRequest 會呼叫到 NewRequestWithContext 函式上。這個函式會根據請求返回一個 Request 結構體,它裡面包含了一個 HTTP 請求所有資訊。

Request

Request 結構體有很多欄位,我這裡列舉幾個大家比較熟悉的欄位:

NewRequestWithContext

func NewRequestWithContext(ctx context.Context, method, url string, body io.Reader) (*Request, error) {
...
// parse url
u, err := urlpkg.Parse(url)
if err != nil {
return nil, err
}
rc, ok := body.(io.ReadCloser)
if !ok && body != nil {
rc = ioutil.NopCloser(body)
}
u.Host = removeEmptyPort(u.Host)
req := &Request{
ctx: ctx,
Method: method,
URL: u,
Proto: "HTTP/1.1",
ProtoMajor: 1,
ProtoMinor: 1,
Header: make(Header),
Body: rc,
Host: u.Host,
}
...
return req, nil
}

NewRequestWithContext 函式會將請求封裝成一個 Request 結構體並返回。

準備 http 傳送請求

如上圖所示,Client 呼叫 Do 方法處理髮送請求最後會呼叫到 send 函式中。

func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {
resp, didTimeout, err = send(req, c.transport(), deadline)
if err != nil {
return nil, didTimeout, err
}
...
return resp, nil, nil
}

Transport

Client 的 send 方法在呼叫 send 函式進行下一步的處理前會先呼叫 transport 方法獲取 DefaultTransport 例項,該例項如下:

var DefaultTransport RoundTripper = &Transport{
// 定義 HTTP 代理策略
Proxy: ProxyFromEnvironment,
DialContext: (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
DualStack: true,
}).DialContext,
ForceAttemptHTTP2: true,
// 最大空閒連線數
MaxIdleConns: 100,
// 空閒連線超時時間
IdleConnTimeout: 90 * time.Second,
// TLS 握手超時時間
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
}

Transport 實現 RoundTripper 介面,該結構體會發送 http 請求並等待響應。

type RoundTripper interface {
RoundTrip(*Request) (*Response, error)
}

從 RoundTripper 介面我們也可以看出,該介面定義的 RoundTrip 方法會具體的處理請求,處理完畢之後會響應 Response。

回到我們上面的 Client 的 send 方法中,它會呼叫 send 函式,這個函式主要邏輯都交給 Transport 的 RoundTrip 方法來執行。

RoundTrip 會呼叫到 roundTrip 方法中:

func (t *Transport) roundTrip(req *Request) (*Response, error) {
t.nextProtoOnce.Do(t.onceSetNextProtoDefaults)
ctx := req.Context()
trace := httptrace.ContextClientTrace(ctx)
...
for {
select {
case <-ctx.Done():
req.closeBody()
return nil, ctx.Err()
default:
} // 封裝請求
treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey}
cm, err := t.connectMethodForRequest(treq)
if err != nil {
req.closeBody()
return nil, err
}
// 獲取連線
pconn, err := t.getConn(treq, cm)
if err != nil {
t.setReqCanceler(cancelKey, nil)
req.closeBody()
return nil, err
} // 等待響應結果
var resp *Response
if pconn.alt != nil {
// HTTP/2 path.
t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest
resp, err = pconn.alt.RoundTrip(req)
} else {
resp, err = pconn.roundTrip(treq)
}
if err == nil {
resp.Request = origReq
return resp, nil
}
...
}
}

roundTrip 方法會做兩件事情:

  1. 呼叫 Transport 的 getConn 方法獲取連線;
  2. 在獲取到連線後,呼叫 persistConn 的 roundTrip 方法等待請求響應結果;

獲取連線 getConn

getConn 有兩個階段:

  1. 呼叫 queueForIdleConn 獲取空閒 connection;
  2. 呼叫 queueForDial 等待建立新的 connection;

func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) {	req := treq.Request	trace := treq.trace	ctx := req.Context()	if trace != nil && trace.GetConn != nil {		trace.GetConn(cm.addr())	}		// 將請求封裝成 wantConn 結構體	w := &wantConn{		cm:         cm,		key:        cm.key(),		ctx:        ctx,		ready:      make(chan struct{}, 1),		beforeDial: testHookPrePendingDial,		afterDial:  testHookPostPendingDial,	}	defer func() {		if err != nil {			w.cancel(t, err)		}	}() 	// 獲取空閒連線	if delivered := t.queueForIdleConn(w); delivered {		pc := w.pc		...		t.setReqCanceler(treq.cancelKey, func(error) {})		return pc, nil	} 	// 建立連線	t.queueForDial(w) 	select {	// 獲取到連線後進入該分支	case <-w.ready:		...		return w.pc, w.err	...}

獲取空閒連線 queueForIdleConn

成功獲取到空閒 connection:

成功獲取 connection 分為如下幾步:

  1. 根據當前的請求的地址去空閒 connection 字典中檢視存不存在空閒的 connection 列表;
  2. 如果能獲取到空閒的 connection 列表,那麼獲取到列表的最後一個 connection;
  3. 返回;

獲取不到空閒 connection:

當獲取不到空閒 connection 時:

  1. 根據當前的請求的地址去空閒 connection 字典中檢視存不存在空閒的 connection 列表;
  2. 不存在該請求的 connection 列表,那麼將該 wantConn 加入到 等待獲取空閒 connection 字典中;

從上面的圖解應該就很能看出這一步會怎麼操作了,這裡簡要的分析一下程式碼,讓大家更清楚裡面的邏輯:

func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) {	if t.DisableKeepAlives {		return false	}	t.idleMu.Lock()	defer t.idleMu.Unlock() 	t.closeIdle = false	if w == nil { 		return false	} 	// 計算空閒連線超時時間	var oldTime time.Time	if t.IdleConnTimeout > 0 {		oldTime = time.Now().Add(-t.IdleConnTimeout)	}	// Look for most recently-used idle connection.	// 找到key相同的 connection 列表	if list, ok := t.idleConn[w.key]; ok {		stop := false		delivered := false		for len(list) > 0 && !stop {			// 找到connection列表最後一個			pconn := list[len(list)-1] 			// 檢查這個 connection 是不是等待太久了			tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime)			if tooOld { 				go pconn.closeConnIfStillIdle()			}			// 該 connection 被標記為 broken 或 閒置太久 continue			if pconn.isBroken() || tooOld { 				list = list[:len(list)-1]				continue			}			// 嘗試將該 connection 寫入到 w 中			delivered = w.tryDeliver(pconn, nil)			if delivered {				// 操作成功,需要將 connection 從空閒列表中移除				if pconn.alt != nil { 				} else { 					t.idleLRU.remove(pconn)					list = list[:len(list)-1]				}			}			stop = true		}		if len(list) > 0 {			t.idleConn[w.key] = list		} else {			// 如果該 key 對應的空閒列表不存在,那麼將該key從字典中移除			delete(t.idleConn, w.key)		}		if stop {			return delivered		}	} 	// 如果找不到空閒的 connection	if t.idleConnWait == nil {		t.idleConnWait = make(map[connectMethodKey]wantConnQueue)	}  // 將該 wantConn 加入到 等待獲取空閒 connection 字典中	q := t.idleConnWait[w.key] 	q.cleanFront()	q.pushBack(w)	t.idleConnWait[w.key] = q	return false}

上面的註釋已經很清楚了,我這裡就不再解釋了。

建立連線 queueForDial

在獲取不到空閒連線之後,會嘗試去建立連線,從上面的圖大致可以看到,總共分為以下幾個步驟:

  1. 在呼叫 queueForDial 方法的時候會校驗 MaxConnsPerHost 是否未設定或已達上限;

    1. 檢驗不通過則將當前的請求放入到 connsPerHostWait 等待字典中;
  2. 如果校驗通過那麼會非同步的呼叫 dialConnFor 方法建立連線;
  3. dialConnFor 方法首先會呼叫 dialConn 方法建立 TCP 連線,然後啟動兩個非同步執行緒來處理讀寫資料,然後呼叫 tryDeliver 將連線繫結到 wantConn 上面。

下面進行程式碼分析:

func (t *Transport) queueForDial(w *wantConn) {	w.beforeDial()	// 小於零說明無限制,非同步建立連線	if t.MaxConnsPerHost <= 0 {		go t.dialConnFor(w)		return	}	t.connsPerHostMu.Lock()	defer t.connsPerHostMu.Unlock()	// 每個 host 建立的連線數沒達到上限,非同步建立連線	if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost {		if t.connsPerHost == nil {			t.connsPerHost = make(map[connectMethodKey]int)		}		t.connsPerHost[w.key] = n + 1		go t.dialConnFor(w)		return	}	//每個 host 建立的連線數已達到上限,需要進入等待佇列	if t.connsPerHostWait == nil {		t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue)	}	q := t.connsPerHostWait[w.key]	q.cleanFront()	q.pushBack(w)	t.connsPerHostWait[w.key] = q}

這裡主要進行引數校驗,如果最大連線數限制為零,亦或是每個 host 建立的連線數沒達到上限,那麼直接非同步建立連線。

dialConnFor

func (t *Transport) dialConnFor(w *wantConn) {	defer w.afterDial()	// 建立連線	pc, err := t.dialConn(w.ctx, w.cm)	// 連線繫結 wantConn	delivered := w.tryDeliver(pc, err)	// 建立連線成功,但是繫結 wantConn 失敗	// 那麼將該連線放置到空閒連線字典或呼叫 等待獲取空閒 connection 字典 中的元素執行	if err == nil && (!delivered || pc.alt != nil) { 		t.putOrCloseIdleConn(pc)	}	if err != nil {		t.decConnsPerHost(w.key)	}}

dialConnFor 會呼叫 dialConn 進行 TCP 連線建立,建立完畢之後呼叫 tryDeliver 方法和 wantConn 進行繫結。

dialConn

func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) {	// 建立連線結構體	pconn = &persistConn{		t:             t,		cacheKey:      cm.key(),		reqch:         make(chan requestAndChan, 1),		writech:       make(chan writeRequest, 1),		closech:       make(chan struct{}),		writeErrCh:    make(chan error, 1),		writeLoopDone: make(chan struct{}),	}	...	if cm.scheme() == "https" && t.hasCustomTLSDialer() {		...	} else {		// 建立 tcp 連線		conn, err := t.dial(ctx, "tcp", cm.addr())		if err != nil {			return nil, wrapErr(err)		}		pconn.conn = conn 	} 	...	if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" {		if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok {			alt := next(cm.targetAddr, pconn.conn.(*tls.Conn))			if e, ok := alt.(http2erringRoundTripper); ok {				// pconn.conn was closed by next (http2configureTransport.upgradeFn).				return nil, e.err			}			return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil		}	}	pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize())	pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize())	//為每個連線非同步處理讀寫資料	go pconn.readLoop()	go pconn.writeLoop()	return pconn, nil}

這裡會根據 schema 的不同設定不同的連線配置,我上面顯示的是我們常用的 HTTP 連線的建立過程。對於 HTTP 來說會建立 tcp 連線,然後為連線非同步處理讀寫資料,最後將建立好的連線返回。

等待響應

這一部分的內容會稍微複雜一些,但確實非常的有趣。

在建立連線的時候會初始化兩個 channel :writech 負責寫入請求資料,reqch負責讀取響應資料。我們在上面建立連線的時候,也提到了會為連線建立兩個非同步迴圈 readLoop 和 writeLoop 來負責處理讀寫資料。

在獲取到連線之後,會呼叫連線的 roundTrip 方法,它首先會將請求資料寫入到 writech 管道中,writeLoop 接收到資料之後就會處理請求。

然後 roundTrip 會將 requestAndChan 結構體寫入到 reqch 管道中,然後 roundTrip 會迴圈等待。readLoop 讀取到響應資料之後就會通過 requestAndChan 結構體中儲存的管道將資料封裝成 responseAndError 結構體回寫,這樣 roundTrip 就可以接受到響應資料結束迴圈等待並返回。

roundTrip

func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {	...	writeErrCh := make(chan error, 1)	// 將請求資料寫入到 writech 管道中	pc.writech <- writeRequest{req, writeErrCh, continueCh}	// 用於接收響應的管道	resc := make(chan responseAndError)	// 將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中	pc.reqch <- requestAndChan{		req:        req.Request,		cancelKey:  req.cancelKey,		ch:         resc,		...	}	...	for {		testHookWaitResLoop()		select { 		// 接收到響應資料		case re := <-resc:			if (re.res == nil) == (re.err == nil) {				panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil))			}			if debugRoundTrip {				req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err)			}			if re.err != nil {				return nil, pc.mapRoundTripError(req, startBytesWritten, re.err)			}			// 返回響應資料			return re.res, nil		...	}}

這裡會封裝好 writeRequest 作為傳送請求的資料,並將用於接收響應的管道封裝成 requestAndChan 寫入到 reqch 管道中,然後迴圈等待接受響應。

然後 writeLoop 會進行請求資料 writeRequest :

func (pc *persistConn) writeLoop() {	defer close(pc.writeLoopDone)	for {		select {		case wr := <-pc.writech:			startBytesWritten := pc.nwrite			// 向 TCP 連線中寫入資料,併發送至目標伺服器			err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh))			...		case <-pc.closech:			return		}	}}

這裡會將從 writech 管道中獲取到的資料寫入到 TCP 連線中,併發送至目標伺服器。

readLoop

func (pc *persistConn) readLoop() {	closeErr := errReadLoopExiting // default value, if not changed below	defer func() {		pc.close(closeErr)		pc.t.removeIdleConn(pc)	}()	... 	alive := true	for alive {		pc.readLimit = pc.maxHeaderResponseSize()		// 獲取 roundTrip 傳送的結構體		rc := <-pc.reqch		trace := httptrace.ContextClientTrace(rc.req.Context())		var resp *Response		if err == nil {			// 讀取資料			resp, err = pc.readResponse(rc, trace)		} else {			err = transportReadFromServerError{err}			closeErr = err		}		...  		// 將響應資料寫回到管道中		select {		case rc.ch <- responseAndError{res: resp}:		case <-rc.callerGone:			return		}		...	}}

這裡是從 TCP 連線中讀取到對應的請求響應資料,通過 roundTrip 傳入的管道再回寫,然後 roundTrip 就會接受到資料並獲取的響應資料返回。

http server

我這裡繼續以一個簡單的例子作為開頭:

func HelloHandler(w http.ResponseWriter, r *http.Request) {	fmt.Fprintf(w, "Hello World")}func main () {	http.HandleFunc("/", HelloHandler)	http.ListenAndServe(":8000", nil)}

在實現上面我先用一張圖進行簡要的介紹一下:

其實我們從上面例子的方法名就可以知道一些大致的步驟:

  1. 註冊處理器到一個 hash 表中,可以通過鍵值路由匹配;
  2. 註冊完之後就是開啟迴圈監聽,每監聽到一個連線就會建立一個 Goroutine;
  3. 在建立好的 Goroutine 裡面會迴圈的等待接收請求資料,然後根據請求的地址去處理器路由表中匹配對應的處理器,然後將請求交給處理器處理;

註冊處理器

處理器的註冊如上面的例子所示,是通過呼叫 HandleFunc 函式來實現的。

HandleFunc 函式會一直呼叫到 ServeMux 的 Handle 方法中。

func (mux *ServeMux) Handle(pattern string, handler Handler) {	mux.mu.Lock()	defer mux.mu.Unlock()	...	e := muxEntry{h: handler, pattern: pattern}	mux.m[pattern] = e	if pattern[len(pattern)-1] == '/' {		mux.es = appendSorted(mux.es, e)	}	if pattern[0] != '/' {		mux.hosts = true	}}

Handle 會根據路由作為 hash 表的鍵來儲存 muxEntry 物件,muxEntry封裝了 pattern 和 handler。如果路由表示式以'/'結尾,則將對應的muxEntry物件加入到[]muxEntry中。

hash 表是用於路由精確匹配,[]muxEntry用於部分匹配。

監聽

監聽是通過呼叫 ListenAndServe 函式,裡面會呼叫 server 的 ListenAndServe 方法:

func (srv *Server) ListenAndServe() error {	if srv.shuttingDown() {		return ErrServerClosed	}	addr := srv.Addr	if addr == "" {		addr = ":http"	}    // 監聽埠	ln, err := net.Listen("tcp", addr)	if err != nil {		return err	}    // 迴圈接收監聽到的網路請求	return srv.Serve(ln)}

Serve

func (srv *Server) Serve(l net.Listener) error { 	...	baseCtx := context.Background()  	ctx := context.WithValue(baseCtx, ServerContextKey, srv)	for {		// 接收 listener 過來的網路連線		rw, err := l.Accept()		... 		tempDelay = 0		c := srv.newConn(rw)		c.setState(c.rwc, StateNew) 		// 建立協程處理連線		go c.serve(connCtx)	}}

Serve 這個方法裡面會用一個迴圈去接收監聽到的網路連線,然後建立協程處理連線。所以難免就會有一個問題,如果併發很高的話,可能會一次性建立太多協程,導致處理不過來的情況。

處理請求

處理請求是通過為每個連線建立 goroutine 來處理對應的請求:

func (c *conn) serve(ctx context.Context) {	c.remoteAddr = c.rwc.RemoteAddr().String()	ctx = context.WithValue(ctx, LocalAddrContextKey, c.rwc.LocalAddr()) 	... 	ctx, cancelCtx := context.WithCancel(ctx)	c.cancelCtx = cancelCtx	defer cancelCtx() 	c.r = &connReader{conn: c}	c.bufr = newBufioReader(c.r)	c.bufw = newBufioWriterSize(checkConnErrorWriter{c}, 4<<10)  	for {		// 讀取請求		w, err := c.readRequest(ctx) 		... 		// 根據請求路由呼叫處理器處理請求		serverHandler{c.server}.ServeHTTP(w, w.req)		w.cancelCtx()		if c.hijacked() {			return		}		w.finishRequest() 		...	}}

當一個連線建立之後,該連線中所有的請求都將在這個協程中進行處理,直到連線被關閉。在 for 迴圈裡面會迴圈呼叫 readRequest 讀取請求進行處理。

請求處理是通過呼叫 ServeHTTP 進行的:

type serverHandler struct {   srv *Server}func (sh serverHandler) ServeHTTP(rw ResponseWriter, req *Request) {	handler := sh.srv.Handler	if handler == nil {		handler = DefaultServeMux	}	if req.RequestURI == "*" && req.Method == "OPTIONS" {		handler = globalOptionsHandler{}	}	handler.ServeHTTP(rw, req)}

serverHandler 其實就是 Server 包裝了一層。這裡的 sh.srv.Handler引數實際上是傳入的 ServeMux 例項,所以這裡最後會呼叫到 ServeMux 的 ServeHTTP 方法。

最終會通過 handler 呼叫到 match 方法進行路由匹配:

func (mux *ServeMux) match(path string) (h Handler, pattern string) {	v, ok := mux.m[path]	if ok {		return v.h, v.pattern	} 	for _, e := range mux.es {		if strings.HasPrefix(path, e.pattern) {			return e.h, e.pattern		}	}	return nil, ""}

這個方法裡首先會利用進行精確匹配,如果匹配成功那麼直接返回;匹配不成功,那麼會根據 []muxEntry中儲存的和當前路由最接近的已註冊的父節點路由進行匹配,否則繼續匹配下一個父節點路由,直到根路由/。最後會呼叫對應的處理器進行處理。

Reference

https://cloud.tencent.com/developer/article/1515297

https://duyanghao.github.io/http-transport/

https://draveness.me/golang/docs/part4-advanced/ch09-stdlib/golang-net-http

https://laravelacademy.org/post/21003

https://segmentfault.com/a/1190000021653550