1. 程式人生 > >Golang網路:核心API實現剖析(一)

Golang網路:核心API實現剖析(一)

這一章節我們將詳細描述網路關鍵API的實現,主要包括Listen、Accept、Read、Write等。 另外,為了突出關鍵流程,我們選擇忽略所有的錯誤。這樣可以使得程式碼看起來更為簡單。 而且我們只關注tcp協議實現,udp和unix socket不是我們關心的。

Listen

func Listen(net, laddr string) (Listener, error) {
   la, err := resolveAddr("listen", net, laddr, noDeadline)
   ......
   switch la := la.toAddr().(type) {
   case *TCPAddr:
       l, err = ListenTCP(net, la)
   case *UnixAddr:
       ......
   }
  ......
}

// 對於tcp協議,返回的的是TCPListener
func ListenTCP(net string, laddr *TCPAddr) (*TCPListener, error) {
   ......
   fd, err := internetSocket(net, laddr, nil, noDeadline, syscall.SOCK_STREAM, 0, "listen")
   ......
   return &TCPListener{fd}, nil
}

func internetSocket(net string, laddr, raddr sockaddr, deadline time.Time, sotype, proto int, mode string) (fd *netFD, err error) {
   ......
   return socket(net, family, sotype, proto, ipv6only, laddr, raddr, deadline)
}

func socket(net string, family, sotype, proto int, ipv6only bool, laddr, raddr sockaddr, deadline time.Time) (fd *netFD, err error) {
   // 建立底層socket,設定屬性為O_NONBLOCK
   s, err := sysSocket(family, sotype, proto)
   ......
   setDefaultSockopts(s, family, sotype, ipv6only)
   // 建立新netFD結構
   fd, err = newFD(s, family, sotype, net)
   ......
   if laddr != nil && raddr == nil {
       switch sotype {
       case syscall.SOCK_STREAM, syscall.SOCK_SEQPACKET:
           // 呼叫底層listen監聽建立的套接字
           fd.listenStream(laddr, listenerBacklog)
           return fd, nil
       case syscall.SOCK_DGRAM:
           ......
       }
   }   
}

// 最終呼叫該函式來建立一個socket
// 並且將socket屬性設定為O_NONBLOCK
func sysSocket(family, sotype, proto int) (int, error) {
   syscall.ForkLock.RLock()
   s, err := syscall.Socket(family, sotype, proto)
   if err == nil {
       syscall.CloseOnExec(s)
   }
   syscall.ForkLock.RUnlock()
   if err != nil {
       return -1, err
   }
   if err = syscall.SetNonblock(s, true); err != nil {
       syscall.Close(s)
       return -1, err
   }
   return s, nil
}

func (fd *netFD) listenStream(laddr sockaddr, backlog int) error {
   if err := setDefaultListenerSockopts(fd.sysfd)
   if lsa, err := laddr.sockaddr(fd.family); err != nil {
       return err
   } else if lsa != nil {
       // Bind繫結至該socket
       if err := syscall.Bind(fd.sysfd, lsa); err != nil {
           return os.NewSyscallError("bind", err)
       }
   }
   // 監聽該socket
   if err := syscall.Listen(fd.sysfd, backlog); 
   // 這裡非常關鍵:初始化socket與非同步IO相關的內容
   if err := fd.init(); err != nil {
       return err
   }
   lsa, _ := syscall.Getsockname(fd.sysfd)
   fd.setAddr(fd.addrFunc()(lsa), nil)
   return nil
}

我們這裡看到了如何實現Listen。流程基本都很簡單,但是因為我們使用了非同步程式設計,因此,我們在Listen完該socket後,還必須將其新增到監聽佇列中,以後該socket有事件到來時能夠及時通知到。

對linux有所瞭解的應該都知道epoll,沒錯golang使用的就是epoll機制來實現socket事件通知。那我們看對一個監聽socket,是如何將其新增到epoll的監聽佇列中呢?

func (fd *netFD) init() error {
   if err := fd.pd.Init(fd); err != nil {
       return err
   }
   return nil
}

func (pd *pollDesc) Init(fd *netFD) error {
   // 利用了Once機制,保證一個程序只會執行一次
   // runtime_pollServerInit: 
   // TEXT net·runtime_pollServerInit(SB),NOSPLIT,$0-0
   // JMP runtime·netpollServerInit(SB)
   serverInit.Do(runtime_pollServerInit)
   // runtime_pollOpen:
   // TEXT net·runtime_pollOpen(SB),NOSPLIT,$0-0
   // JMP runtime·netpollOpen(SB)
   ctx, errno := runtime_pollOpen(uintptr(fd.sysfd))
   if errno != 0 {
       return syscall.Errno(errno)
   }
   pd.runtimeCtx = ctx
   return nil
}

這裡就是socket非同步程式設計的關鍵:

netpollServerInit()初始化非同步程式設計結構,對於epoll,該函式是netpollinit,且使用Once機制保證一個程序 只會初始化一次;

func netpollinit() {
    epfd = epollcreate1(_EPOLL_CLOEXEC)
    if epfd >= 0 {
        return
    }
    epfd = epollcreate(1024)
    if epfd >= 0 {
        closeonexec(epfd)
        return
    }
    ......
}

netpollOpen則在socket被創建出來後將其新增到epoll佇列中,對於epoll,該函式被例項化為netpollopen

func netpollopen(fd uintptr, pd *pollDesc) int32 {
   var ev epollevent
   ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
   *(**pollDesc)(unsafe.Pointer(&ev.data)) = pd
   return -epollctl(epfd, _EPOLL_CTL_ADD, int32(fd), &ev)
}

OK,看到這裡,我們也就明白了,監聽一個套接字的時候無非就是傳統的socket非同步程式設計,然後將該socket新增到 epoll的事件監聽佇列中。

Accept

既然我們描述的重點的tcp協議,因此,我們看看TCPListener的Accept方法是怎麼實現的:

func (l *TCPListener) Accept() (Conn, error) {
    c, err := l.AcceptTCP()
    ......
}

func (l *TCPListener) AcceptTCP() (*TCPConn, error) {
    ......
    fd, err := l.fd.accept()
    ......
    // 返回給呼叫者一個新的TCPConn
    return newTCPConn(fd), nil
}

func (fd *netFD) accept() (netfd *netFD, err error) {
    // 為什麼對該函式加讀鎖?
    if err := fd.readLock(); err != nil {
        return nil, err
    }
    defer fd.readUnlock()
    ......
    for {
        // 這個accept是golang包裝的系統呼叫
        // 用來處理跨平臺
        s, rsa, err = accept(fd.sysfd)
        if err != nil {
            if err == syscall.EAGAIN {
                // 如果沒有可用連線,WaitRead()阻塞該協程
                // 後面會詳細分析WaitRead.
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            } else if err == syscall.ECONNABORTED {
                // 如果連線在Listen queue時就已經被對端關閉
                continue
            }
        }
        break
    }

    netfd, err = newFD(s, fd.family, fd.sotype, fd.net)
    ......
    // 這個前面已經分析,將該fd新增到epoll佇列中
    err = netfd.init()
    ......
    lsa, _ := syscall.Getsockname(netfd.sysfd)
    netfd.setAddr(netfd.addrFunc()(lsa), netfd.addrFunc()(rsa))
    return netfd, nil
}

OK,從前面的程式設計事例中我們知道,一般在主協程中會accept新的connection,使用非同步程式設計我們知道,如果沒有 新連線到來,該協程會一直被阻塞,直到新連線到來有人喚醒了該協程。

一般在主協程中呼叫accept,如果返回值為EAGAIN,則呼叫WaitRead來阻塞當前協程,後續在該socket有事件到來時被喚醒,WaitRead以及喚醒過程我們會在後面仔細分析。

Read

func (c *conn) Read(b []byte) (int, error) {
    if !c.ok() {
        return 0, syscall.EINVAL
    }
    return c.fd.Read(b)
}

func (fd *netFD) Read(p []byte) (n int, err error) {
    // 為什麼對函式呼叫加讀鎖
    if err := fd.readLock(); err != nil {
        return 0, err
    }
    defer fd.readUnlock()
    // 這個又是幹嘛?
    if err := fd.pd.PrepareRead(); err != nil {
        return 0, &OpError{"read", fd.net, fd.raddr, err}
    }
    for {
        n, err = syscall.Read(int(fd.sysfd), p)
        if err != nil {
            n = 0
            // 如果返回EAGIN,阻塞當前協程直到有資料可讀被喚醒
            if err == syscall.EAGAIN {
                if err = fd.pd.WaitRead(); err == nil {
                    continue
                }
            }
        }
        // 檢查錯誤,封裝io.EOF
        err = chkReadErr(n, err, fd)
        break
    }
    if err != nil && err != io.EOF {
        err = &OpError{"read", fd.net, fd.raddr, err}
    }
    return
}

func chkReadErr(n int, err error, fd *netFD) error {
    if n == 0 && err == nil && fd.sotype != syscall.SOCK_DGRAM && fd.sotype != syscall.SOCK_RAW {
        return io.EOF
    }
    return err
}

Read的流程與Accept流程極其一致,閱讀起來也很簡單。相信不用作過多解釋,自己看吧。 需要注意的是每次Read不能保證可以讀到想讀的那麼多內容,比如緩衝區大小是10,而實際可能只讀到5,應用程式需要能夠處理這種情況。

Write

func (fd *netFD) Write(p []byte) (nn int, err error) {
    // 為什麼這裡加寫鎖
    if err := fd.writeLock(); err != nil {
        return 0, err
    }
    defer fd.writeUnlock()
    // 這個是幹什麼?
    if err := fd.pd.PrepareWrite(); err != nil {
        return 0, &OpError{"write", fd.net, fd.raddr, err}
    }
    // nn記錄總共寫入的資料量,每次Write可能只能寫入部分資料
    for {
        var n int
        n, err = syscall.Write(int(fd.sysfd), p[nn:])
        if n > 0 {
            nn += n
        }
        // 如果陣列資料已經全部寫完,函式返回
        if nn == len(p) {
            break
        }
        // 如果寫入資料時被block了,阻塞當前協程
        if err == syscall.EAGAIN {
            if err = fd.pd.WaitWrite(); err == nil {
                continue
            }
        }
        if err != nil {
            n = 0
            break
        }
        // 如果返回值為0,代表了什麼?
        if n == 0 {
            err = io.ErrUnexpectedEOF
            break
        }
    }
    if err != nil {
        err = &OpError{"write", fd.net, fd.raddr, err}
    }
    return nn, err
}

注意Write語義與Read不一樣的地方:

Write儘量將使用者緩衝區的內容全部寫入至底層socket,如果遇到socket暫時不可寫入,會阻塞當前協程; Read在某次讀取成功時立即返回,可能會導致讀取的資料量少於使用者緩衝區的大小; 為什麼會在實現上有此不同,我想可能read的優先順序比較高吧,應用程式可能一直在等著,我們不能等到資料一直讀完才返回,會阻塞使用者。 而寫不一樣,優先順序相對較低,而且使用者一般也不著急寫立即返回,所以可以將所有的資料全部寫入,而且這樣 也能簡化應用程式的寫法。

總結

上面我們基本說完了golang網路程式設計內的關鍵API流程,我們遺留了一個關鍵內容:當系統呼叫返回EAGAIN時,會 呼叫WaitRead/WaitWrite來阻塞當前協程,我會在接下來的章節中繼續分析。