1. 程式人生 > >以太坊p2p網路(五):P2P模組TCP連線池網路通訊機制原始碼分析

以太坊p2p網路(五):P2P模組TCP連線池網路通訊機制原始碼分析

上節中通過設定靜態節點BootstrapNodes節點來發現更多全網的其他節點,這部分只是發現節點並找出其中可以ping通的節點,但是還沒有進行使用,還沒建立TCP連線進行資料傳輸,協議處理等。 這裡主要分析P2P系統的TCP連線池的建立,以及是怎麼跟其他節點通訊的。

一、TCP監聽

P2P網路服務啟動時呼叫Server.Start(),其後面部分程式碼:

p2p/server.go裡面的Start()函式後面部分程式碼

	dynPeers := srv.maxDialedConns()
	 //新建一個dialstate結構返回 , StaticNodes 會直接加到srv.static[]數組裡面
    //dialer是用來接收到上面的一堆管道事件後,管理nodes列表的, 傳送TCP連線的
	dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)

	// handshake
	srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
	for _, p := range srv.Protocols {
		srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
	}

	if srv.ListenAddr != "" {
	//開始監聽TCP請求, TCP埠是需要加密的,傳輸的資訊比較敏感,所以有握手過程
		if err := srv.startListening(); err != nil {
			return err
		}
	}
	if srv.NoDial && srv.ListenAddr == "" {
		srv.log.Warn("P2P server will be useless, neither dialing nor listening")
	}

	srv.loopWG.Add(1)
	//開始監聽各個訊號量,處理peer的增刪改。
	go srv.run(dialer)
	srv.running = true
	return nil
}

這部分程式碼呼叫 startListening 開始監聽TCP請求, 後面建立server.run協程處理連線池管理任務。

startListening 建立一個TCP監聽控制代碼, 然後建立listenLoop協程進行不斷的監聽請求,進行Accept,然後握手,建立安全連線. startListening 建立協程後會立即返回。

p2p/server.go中的startListening()函式

func (srv *Server) startListening() error {
	// 先建立一個TCP監聽控制代碼, 然後建立listenLoop協程進行不斷的監聽請求,進行Accept,然後握手,建立安全連線
	listener, err := net.Listen("tcp", srv.ListenAddr)
	if err != nil {
		return err
	}
	laddr := listener.Addr().(*net.TCPAddr)
	srv.ListenAddr = laddr.String()
	srv.listener = listener
	srv.loopWG.Add(1)
	//進入協程監聽請求
	go srv.listenLoop()
	// 如果配置tcp監聽埠對映
	if !laddr.IP.IsLoopback() && srv.NAT != nil {
		srv.loopWG.Add(1)
		go func() {
			nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
			srv.loopWG.Done()
		}()
	}
	return nil
}

listenLoop 協程的工作,分兩部分,初始化,接受連線, 以及連線建立。

func (srv *Server) listenLoop() {

   defer srv.loopWG.Done()
   srv.log.Info("RLPx listener up", "self", srv.makeSelf(srv.listener, srv.ntab))

   tokens := defaultMaxPendingPeers
   if srv.MaxPendingPeers > 0 {
   	tokens = srv.MaxPendingPeers
   }
   slots := make(chan struct{}, tokens)
   for i := 0; i < tokens; i++ {
   //先直接放入這麼多個slot進去,稍後就一個個獲取了,用來實現併發控制。
   //因為實際上的鏈家請求簡歷,是通過協程進行的,並非在本協程處理,所以用了管道的形式來控制併發。
   	slots <- struct{}{}
   }

   for {
   	//併發控制
   	<-slots

   	var (
   		fd  net.Conn
   		err error
   	)
   	for {
   	//接受連結, 檢查是否有問題,沒問題就break
   		fd, err = srv.listener.Accept()
   		if tempErr, ok := err.(tempError); ok && tempErr.Temporary() {
   			srv.log.Debug("Temporary read error", "err", err)
   			continue
   		} else if err != nil {
   			srv.log.Debug("Read error", "err", err)
   			return
   		}
   		break
   	}

二、TCP連線建立

srv.listener.Accept() 接受連線之後,便會建立新的協程專門處理這一個peer節點的請求。

    if srv.NetRestrict != nil {
   		if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
   			srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
   			fd.Close()
   			slots <- struct{}{}
   			continue
   		}
   	}

   	fd = newMeteredConn(fd, true)
   	srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
   	//建立協程去處理這個連結
   	go func() {
   	//TCP連線是加密的,所以需要進行兩邊的握手,互相交換公鑰
   		srv.SetupConn(fd, inboundConn, nil)
   		slots <- struct{}{}
   	}()
   }

呼叫srv.SetupConn進行協議握手,TCP傳輸連結是加密傳輸的,目前使用的是RLPx加密協議,是在Start函式的srv.newTransport = newRLPX進行的賦值,則處理函式是newRLPX 。

func (srv *Server) SetupConn(fd net.Conn, flags connFlag, dialDest *discover.Node) error {
	self := srv.Self()
	if self == nil {
		return errors.New("shutdown")
	}
	//newTransport 目前使用的是RLPx加密協議,處理函式是newRLPX
	c := &conn{fd: fd, transport: srv.newTransport(fd), flags: flags, cont: make(chan error)}
	err := srv.setupConn(c, flags, dialDest)
	if err != nil {
		c.close(err)
		srv.log.Trace("Setting up connection failed", "id", c.id, "err", err)
	}
	return err
}

繼續呼叫setupConn 函式進行一個TCP協議上的握手功能。setupConn 主要是對握手協議的處理,傳送資料。

func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *discover.Node) error {
	// 進行tcp協議的握手
	srv.lock.Lock()
	running := srv.running
	srv.lock.Unlock()
	if !running {
		return errServerStopped
	}
	// Run the encryption handshake.
	var err error
	//這裡實際上執行的是rlpx.go裡面的doEncHandshake.因為transport是conn的一個匿名欄位。 匿名欄位的方法會直接作為conn的一個方法。
	if c.id, err = c.doEncHandshake(srv.PrivateKey, dialDest); err != nil {
		srv.log.Trace("Failed RLPx handshake", "addr", c.fd.RemoteAddr(), "conn", c.flags, "err", err)
		return err
	}
	clog := srv.log.New("id", c.id, "addr", c.fd.RemoteAddr(), "conn", c.flags)
	// For dialed connections, check that the remote public key matches.
	// 如果連線握手的ID和對應的ID不匹配
	if dialDest != nil && c.id != dialDest.ID {
		clog.Trace("Dialed identity mismatch", "want", c, dialDest.ID)
		return DiscUnexpectedIdentity
	}
	// 這個checkpoint其實就是把第一個引數傳送給第二個引數指定的佇列。然後從c.cout接收返回資訊。 是一個同步的方法。
	//至於這裡,後續的操作只是檢查了一下連線是否合法就返回了。
	err = srv.checkpoint(c, srv.posthandshake)
	if err != nil {
		clog.Trace("Rejected peer before protocol handshake", "err", err)
		return err
	}
	// Run the protocol handshake
	phs, err := c.doProtoHandshake(srv.ourHandshake)
	if err != nil {
		clog.Trace("Failed proto handshake", "err", err)
		return err
	}
	if phs.ID != c.id {
		clog.Trace("Wrong devp2p handshake identity", "err", phs.ID)
		return DiscUnexpectedIdentity
	}
	c.caps, c.name = phs.Caps, phs.Name
	// 這裡兩次握手都已經完成了。 把c傳送給addpeer佇列。 後臺處理這個佇列的時候,會處理這個連線
	err = srv.checkpoint(c, srv.addpeer)
	if err != nil {
		clog.Trace("Rejected peer", "err", err)
		return err
	}
	// If the checks completed successfully, runPeer has now been
	// launched by run.
	clog.Trace("connection set up", "inbound", dialDest == nil)
	return nil
}

srv.checkpoint(c, srv.addpeer)比較重要,會在addpeer 上面插入一條訊息,而訊息的另外一端就是 server.run()。 當最後協議握手完成後,會呼叫到下面的程式碼:

p2p/server.go中的run()函式
	...

select {
		...
		case c := <-srv.addpeer:
			// At this point the connection is past the protocol handshake.
			// Its capabilities are known and the remote identity is verified.
			err := srv.protoHandshakeChecks(peers, inboundCount, c)
			if err == nil {
				// The handshakes are done and it passed all checks.
				p := newPeer(c, srv.Protocols)
				// If message events are enabled, pass the peerFeed
				// to the peer
				if srv.EnableMsgEvents {
					p.events = &srv.peerFeed
				}
				name := truncateName(c.name)
				srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
				go srv.runPeer(p)
				peers[c.id] = p
				if p.Inbound() {
					inboundCount++
				}
			}
			// The dialer logic relies on the assumption that
			// dial tasks complete after the peer has been added or
			// discarded. Unblock the task last.
			select {
			case c.cont <- err:
			case <-srv.quit:
				break running
			}
		case pd := <-srv.delpeer:
			// A peer disconnected.
			d := common.PrettyDuration(mclock.Now() - pd.created)
			pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
			delete(peers, pd.ID())
			if pd.Inbound() {
				inboundCount--
			}
		}
	}

protoHandshakeChecks 進行一些必要檢查後如果正確,那麼呼叫newPeer建立一個peer結構,在建立peer結構後,呼叫matchProtocols()函式先排序,再通過Name和Version區分出子協議,設定好返回子協議的offset等引數,然後建立協程go srv.runPeer§, 並記錄這個peer節點:peers[c.id] = p。 runPeer 的主要任務是進行回撥,以及建立了peer的時間處理,另外最主要的是呼叫peer.run啟動跟這個peer的協程讀寫迴圈。

func (srv *Server) runPeer(p *Peer) {
//跟一個peer連線建立完成後,呼叫這裡來啟動一個peer的維護,監聽工作
	if srv.newPeerHook != nil {
		srv.newPeerHook(p)
	}

	// 廣播新增一個peer
	srv.peerFeed.Send(&PeerEvent{
		Type: PeerEventTypeAdd,
		Peer: p.ID(),
	})

	// 執行協議
	remoteRequested, err := p.run()

	// 廣播peer銷燬
	srv.peerFeed.Send(&PeerEvent{
		Type:  PeerEventTypeDrop,
		Peer:  p.ID(),
		Error: err.Error(),
	})

	srv.delpeer <- peerDrop{p, err, remoteRequested}
}

三、連線池管理

server.run 主要任務是進行 接池管理協程,負責維護TCP連線列表, 協程裡面執行, 開始監聽各個訊號量,處理peer的增刪改。 上面說的startListening 主要進行被動的接受連結然後進行握手,最後加入到連線池。 而server.run則是一個事件迴圈和連線池管理功能。

server.run中startTasks函式 和 scheduleTasks函式 , 其功能如下:

  1. startTasks 用來跟引數陣列代表的節點一個個建立協程建立連線,並且每次活躍的正在建立連線的任務數不超過 maxActiveDialTasks。
  2. scheduleTasks 如果沒有足夠的節點,那麼呼叫這裡開始去嘗試從p2p discover 中找出更多節點來用。scheduleTasks 會呼叫startTasks進行連線。
	//用來跟引數陣列代表的節點一個個建立協程建立連線
	startTasks := func(ts []task) (rest []task) {
		i := 0
		//最多可以建立maxActiveDialTasks 個連線
		for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
			t := ts[i]
			srv.log.Trace("New dial task", "task", t)
			go func() { t.Do(srv); taskdone <- t }()
			//當前正在使用的連結
			runningTasks = append(runningTasks, t)
		}
		return ts[i:]
	}
	//用來掃描服務發現的P2P節點,建立TCP連線
	scheduleTasks := func() {
		//先嚐試用startTasks 呼叫 queuedTasks 列表,試圖一個個建立連結,不過可能由於maxActiveDialTasks 的限制,預設16個
		queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
		//如果已經建立的連結還不到16個,那麼嘗試從p2p discover  中找出更多節點來用。
		if len(runningTasks) < maxActiveDialTasks {
			nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
			queuedTasks = append(queuedTasks, startTasks(nt)...)
		}
	}

server.run 一開始在迴圈開始便呼叫scheduleTasks()函式來嘗試連結對端節點,如果不夠就從discover模組申請更多節點。其中比較重要的是呼叫Do 和 newTasks函式。

dialTask.Do 函式用來跟一個節點建立連結,節點為bootnodes, 以及s.ntab.ReadRandomNodes(s.randomNodes) 返回的節點。

func (t *dialTask) Do(srv *Server) {
	if t.dest.Incomplete() {
		if !t.resolve(srv) {
			return
		}
	}
	//連線某個TCP節點
	err := t.dial(srv, t.dest)
	if err != nil {
		log.Trace("Dial error", "task", t, "err", err)
		// Try resolving the ID of static nodes if dialing failed.
		if _, ok := err.(*dialError); ok && t.flags&staticDialedConn != 0 {
			if t.resolve(srv) {
				t.dial(srv, t.dest)
			}
		}
	}
}
func (t *dialTask) dial(srv *Server, dest *discover.Node) error {
//連線某個TCP節點,傳送一個TCP連線請求 
	fd, err := srv.Dialer.Dial(dest)
	if err != nil {
		return &dialError{err}
	}
	mfd := newMeteredConn(fd, false)
	//進行握手,建立安全連線
	return srv.SetupConn(mfd, t.flags, dest)
}

newTasks 按照一定優先順序,查詢可以連線的節點,首先從bootnode開始,然後從p2p 服務發現的節點中隨機取一些出來,返回這些節點給呼叫方。

func (s *dialstate) newTasks(nRunning int, peers map[discover.NodeID]*Peer, now time.Time) []task {
//按照一定優先順序,查詢可以連線的節點,首先從bootnode開始,然後從p2p 服務發現的節點中隨機取一些出來,返回這些節點給呼叫方。
	if s.start.IsZero() {
		s.start = now
	}

	var newtasks []task
	addDial := func(flag connFlag, n *discover.Node) bool {
	//檢查連線狀態 , 如果連線可用,那麼標記為正在連線,並且加到待連線任務newtasks 裡面
		if err := s.checkDial(n, peers); err != nil {
			log.Trace("Skipping dial candidate", "id", n.ID, "addr", &net.TCPAddr{IP: n.IP, Port: int(n.TCP)}, "err", err)
			return false
		}
		//標記正在進行
		s.dialing[n.ID] = flag
		newtasks = append(newtasks, &dialTask{flags: flag, dest: n})
		return true
	}

	// //首先判斷已經建立的連線的型別。如果是動態型別。那麼需要建立動態連結數量減少。
	needDynDials := s.maxDynDials
	for _, p := range peers {
		if p.rw.is(dynDialedConn) {
			needDynDials--
		}
	}
	
	//然後再判斷正在建立的連結。如果是動態型別。那麼需要建立動態連結數量減少
	for _, flag := range s.dialing {
		if flag&dynDialedConn != 0 {
			needDynDials--
		}
	}

	s.hist.expire(now)

	// //檢視所有的靜態型別。如果可以那麼也建立連結。
	for id, t := range s.static {
		err := s.checkDial(t.dest, peers)
		switch err {
		case errNotWhitelisted, errSelf:
			log.Warn("Removing static dial candidate", "id", t.dest.ID, "addr", &net.TCPAddr{IP: t.dest.IP, Port: int(t.dest.TCP)}, "err", err)
			delete(s.static, t.dest.ID)
		case nil:
			s.dialing[id] = t.flags
			newtasks = append(newtasks, t)
		}
	}

//如果當前還沒有任何連結。 而且20秒(fallbackInterval)內沒有建立任何連結。 那麼就使用bootnode建立連結。
	if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
		bootnode := s.bootnodes[0]
		s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
		s.bootnodes = append(s.bootnodes, bootnode)

		if addDial(dynDialedConn, bootnode) {
			needDynDials--
		}
	}

//否則使用1/2的隨機節點建立連結。
	randomCandidates := needDynDials / 2
	if randomCandidates > 0 {
		n := s.ntab.ReadRandomNodes(s.randomNodes)
		for i := 0; i < randomCandidates && i < n; i++ {
			if addDial(dynDialedConn, s.randomNodes[i]) {
				needDynDials--
			}
		}
	}
	// Create dynamic dials from random lookup results, removing tried
	// items from the result buffer.
	i := 0
	for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
		if addDial(dynDialedConn, s.lookupBuf[i]) {
			needDynDials--
		}
	}
	s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]
	// 如果就算這樣也不能建立足夠動態連結。 那麼建立一個discoverTask用來再網路上查詢其他的節點。放入lookupBuf。
	if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
		s.lookupRunning = true
		newtasks = append(newtasks, &discoverTask{})
	}

// 如果當前沒有任何任務需要做,那麼建立一個睡眠的任務返回。
	if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
		t := &waitExpireTask{s.hist.min().exp.Sub(now)}
		newtasks = append(newtasks, t)
	}
	return newtasks
}

server.run接下來就是簡單的進行各個管道的監聽,如果有事件便處理,主要還是涉及到握手協議的流程,以及進行peer的增刪改操作。

p2p/server.go中的server.run程式碼

running:
	for {
	...

 //下面開始監聽各個管道的事件,來處理peer的增刪改操作
		select {
		case <-srv.quit:
			// The server was stopped. Run the cleanup logic.
			break running
		case n := <-srv.addstatic:
			// This channel is used by AddPeer to add to the
			// ephemeral static peer list. Add it to the dialer,
			// it will keep the node connected.
			srv.log.Trace("Adding static node", "node", n)
			dialstate.addStatic(n)
		case n := <-srv.removestatic:
			// This channel is used by RemovePeer to send a
			// disconnect request to a peer and begin the
			// stop keeping the node connected.
			srv.log.Trace("Removing static node", "node", n)
			dialstate.removeStatic(n)
			if p, ok := peers[n.ID]; ok {
				p.Disconnect(DiscRequested)
			}
		case n := <-srv.addtrusted:
			// This channel is used by AddTrustedPeer to add an enode
			// to the trusted node set.
			srv.log.Trace("Adding trusted node", "node", n)
			trusted[n.ID] = true
			// Mark any already-connected peer as trusted
			if p, ok := peers[n.ID]; ok {
				p.rw.set(trustedConn, true)
			}
		case n := <-srv.removetrusted:
			// This channel is used by RemoveTrustedPeer to remove an enode
			// from the trusted node set.
			srv.log.Trace("Removing trusted node", "node", n)
			if _, ok := trusted[n.ID]; ok {
				delete(trusted, n.ID)
			}
			// Unmark any already-connected peer as trusted
			if p, ok := peers[n.ID]; ok {
				p.rw.set(trustedConn, false)
			}
		case op := <-srv.peerOp:
			// This channel is used by Peers and PeerCount.
			op(peers)
			srv.peerOpDone <- struct{}{}
		case t := <-taskdone:
			// A task got done. Tell dialstate about it so it
			// can update its state and remove it from the active
			// tasks list.
			srv.log.Trace("Dial task done", "task", t)
			dialstate.taskDone(t, time.Now())
			delTask(t)
		case c := <-srv.posthandshake:
			// A connection has passed the encryption handshake so
			// the remote identity is known (but hasn't been verified yet).
			if trusted[c.id] {
				// Ensure that the trusted flag is set before checking against MaxPeers.
				c.flags |= trustedConn
			}
			// TODO: track in-progress inbound node IDs (pre-Peer) to avoid dialing them.
			select {
			case c.cont <- srv.encHandshakeChecks(peers, inboundCount, c):
			case <-srv.quit:
				break running
			}
		case c := <-srv.addpeer:
			// At this point the connection is past the protocol handshake.
			// Its capabilities are known and the remote identity is verified.
			err := srv.protoHandshakeChecks(peers, inboundCount, c)
			if err == nil {
				// The handshakes are done and it passed all checks.
				p := newPeer(c, srv.Protocols)
				// If message events are enabled, pass the peerFeed
				// to the peer
				if srv.EnableMsgEvents {
					p.events = &srv.peerFeed
				}
				name := truncateName(c.name)
				srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
				go srv.runPeer(p)
				peers[c.id] = p
				if p.Inbound() {
					inboundCount++
				}
			}
			// The dialer logic relies on the assumption that
			// dial tasks complete after the peer has been added or
			// discarded. Unblock the task last.
			select {
			case c.cont <- err:
			case <-srv.quit:
				break running
			}
		case pd := <-srv.delpeer:
			// A peer disconnected.
			d := common.PrettyDuration(mclock.Now() - pd.created)
			pd.log.Debug("Removing p2p peer", "duration", d, "peers", len(peers)-1, "req", pd.requested, "err", pd.err)
			delete(peers, pd.ID())
			if pd.Inbound() {
				inboundCount--
			}
		}
	}

TCP模組靠startListening 來進行被動連線監聽, server.run進行主動的連線池管理,以及連線狀態跳轉,peer的增刪改操作。 server.run在如果連線數不夠,那麼會開始進行不斷的嘗試,按照一定優先順序去查詢可以連線的節點,首先從bootnode開始,然後從p2p 服務發現的節點中隨機取一些出來。