1 以太坊的事件機制

以太坊go-ethereum原始碼中傳送事件除了用常規的通道以外,還用了封裝的Feed結構來執行事件的訂閱和傳送。以太坊中使用了大量的Feed來處理事件。使用Feed訂閱事件的步驟是:

  • 定義一個通道ch:ch=make(someType)
  • 定義一個Feed物件feed
  • Feed訂閱通道ch:feed.Subscribe(ch)
  • 使用feed傳送資料給通道:feed.Send(someTypeData)
  • ch接收資料:ret<-ch

一個feed可以訂閱多個通道,當使用feed傳送資料後,所有的通道都將接收到資料。下文將解讀Feed的原始碼,在進入Feed原始碼解讀之前我們先介紹一下go中的reflect包中的SelectCase。

2 使用reflect.SelectCase來監聽多個通道

對於多個通道ch1,ch2,ch3,使用傳統的Select方式來監聽:

package main
 
import (
	"fmt"
	"strconv"
)
 
func main() {
	var chs1 = make(chan int)
	var chs2 = make(chan float64)
	var chs3 = make(chan string)
	var ch4close = make(chan int)
	defer close(ch4close)
 
	go func(c chan int, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- i
		}
		close(c)
		ch4close <- 1
	}(chs1, ch4close)
 
	go func(c chan float64, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- float64(i) + 0.1
		}
		close(c)
		ch4close <- 1
	}(chs2, ch4close)
 
	go func(c chan string, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- "string:" + strconv.Itoa(i)
		}
		close(c)
		ch4close <- 1
	}(chs3, ch4close)
 
	done := 0
	finished := 0
	for finished < 3 {
		select {
		case v, ok := <-chs1:
			if ok {
				done = done + 1
				fmt.Println(0, v)
			}
		case v, ok := <-chs2:
			if ok {
				done = done + 1
				fmt.Println(1, v)
			}
		case v, ok := <-chs3:
			if ok {
				done = done + 1
				fmt.Println(2, v)
			}
		case _, ok := <- ch4close:
			if ok {
				finished = finished+1
			}
		}
	}
	fmt.Println("Done", done)
}

使用reflect的方式來監聽:

package main
 
import (
	"fmt"
	"reflect"
	"strconv"
)
 
func main() {
	var chs1 = make(chan int)
	var chs2 = make(chan float64)
	var chs3 = make(chan string)
	var ch4close = make(chan int)
	defer close(ch4close)
 
	go func(c chan int, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- i
		}
		close(c)
		ch4close <- 1
	}(chs1, ch4close)
 
	go func(c chan float64, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- float64(i) + 0.1
		}
		close(c)
		ch4close <- 1
	}(chs2, ch4close)
 
	go func(c chan string, ch4close chan int) {
		for i := 0; i < 5; i++ {
			c <- "string:" + strconv.Itoa(i)
		}
		close(c)
		ch4close <- 1
	}(chs3, ch4close)
 
 
	var selectCase = make([]reflect.SelectCase, 4)
	selectCase[0].Dir = reflect.SelectRecv
	selectCase[0].Chan = reflect.ValueOf(chs1)
 
	selectCase[1].Dir = reflect.SelectRecv
	selectCase[1].Chan = reflect.ValueOf(chs2)
 
	selectCase[2].Dir = reflect.SelectRecv
	selectCase[2].Chan = reflect.ValueOf(chs3)
 
	selectCase[3].Dir = reflect.SelectRecv
	selectCase[3].Chan = reflect.ValueOf(ch4close)
 
	done := 0
	finished := 0
	for finished < len(selectCase)-1 {
		chosen, recv, recvOk := reflect.Select(selectCase)
 
		if recvOk {
			done = done+1
			switch chosen {
			case 0:
				fmt.Println(chosen, recv.Int())
			case 1:
				fmt.Println(chosen, recv.Float())
			case 2:
				fmt.Println(chosen, recv.String())
			case 3:
				finished = finished+1
				done = done-1
				// fmt.Println("finished\t", finished)
			}
		}
	}
	fmt.Println("Done", done)
 
}

這裡構建了一個reflect.SelectCase陣列selectCase,將要監聽的通道新增到陣列中。監聽時只要使用reflect.Select(selectCase)就可以監聽所有通道的訊息。當通道數多的時候,用SelectCase的方式將會更簡潔優雅。

3 Feed原始碼解讀

Feed結構的原始碼在event/feed.go中。

Feed結構

type Feed struct {
	once      sync.Once        // ensures that init only runs once
	sendLock  chan struct{}    // sendLock has a one-element buffer and is empty when held.It protects sendCases.
	removeSub chan interface{} // interrupts Send
	sendCases caseList         // the active set of select cases used by Send

	// The inbox holds newly subscribed channels until they are added to sendCases.
	mu     sync.Mutex
	inbox  caseList
	etype  reflect.Type
	closed bool
}

type caseList []reflect.SelectCase

Feed結構核心的是inbox成員,它是一個SelectCase的陣列,儲存了該Feed訂閱的所有通道。sendCase是所有活躍的通道陣列。sendLock通道用來作為鎖來保護sendCase。

初始化函式

func (f *Feed) init() {
	f.removeSub = make(chan interface{})
	f.sendLock = make(chan struct{}, 1)
	f.sendLock <- struct{}{}
	f.sendCases = caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
}

這裡sendLock被設定成有容量為1的緩衝通道。並且給sendLock先寫入了一個值。sendCases預先加入了removeSub通道作為第一個通道。

通道訂閱函式


//這個通道需要有足夠的緩衝空間以避免阻塞其它訂閱者。速度慢的訂閱者不會被丟棄
func (f *Feed) Subscribe(channel interface{}) Subscription {
	f.once.Do(f.init)

	chanval := reflect.ValueOf(channel)
	chantyp := chanval.Type()
	if chantyp.Kind() != reflect.Chan || chantyp.ChanDir()&reflect.SendDir == 0 {
		panic(errBadChannel)
	}
	sub := &feedSub{feed: f, channel: chanval, err: make(chan error, 1)}

	f.mu.Lock()
	defer f.mu.Unlock()
	if !f.typecheck(chantyp.Elem()) {
		panic(feedTypeError{op: "Subscribe", got: chantyp, want: reflect.ChanOf(reflect.SendDir, f.etype)})
	}
	// Add the select case to the inbox.
	// The next Send will add it to f.sendCases.
	cas := reflect.SelectCase{Dir: reflect.SelectSend, Chan: chanval}
	f.inbox = append(f.inbox, cas)
	return sub
}

這個函式做的事情很簡單,就是根據通道ch構造一個SelectCase物件,然後將其加入到inbox陣列中。這樣就完成了通道的訂閱。

傳送函式

// Send delivers to all subscribed channels simultaneously.
// It returns the number of subscribers that the value was sent to.
func (f *Feed) Send(value interface{}) (nsent int) {
	rvalue := reflect.ValueOf(value)

	f.once.Do(f.init)//重新初始化,onece.Do保證只會執行一次
	<-f.sendLock    //讀sendLock通道,若sendLock為空則會堵塞

	// Add new cases from the inbox after taking the send lock.
	f.mu.Lock()    //訪問公共變數加鎖
	f.sendCases = append(f.sendCases, f.inbox...)//將inbox注入到sendCase
	f.inbox = nil 

	if !f.typecheck(rvalue.Type()) {
		f.sendLock <- struct{}{}    //出錯了,退出前先寫sendLock以免下次send操作堵塞
		panic(feedTypeError{op: "Send", got: rvalue.Type(), want: f.etype})
	}
	f.mu.Unlock()

	// 給所有通道設定要傳送的資料
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = rvalue
	}

	// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
	// of sendCases. When a send succeeds, the corresponding case moves to the end of
	// 'cases' and it shrinks by one element.
	cases := f.sendCases
	
	for {
		// Fast path: try sending without blocking before adding to the select set.
		// This should usually succeed if subscribers are fast enough and have free
		// buffer space.
		for i := firstSubSendCase; i < len(cases); i++ {
           //首先使用TrySend進行傳送,這是一種非阻塞操作。當訂閱者足夠快時一般能夠立即成功
			if cases[i].Chan.TrySend(rvalue) {
				nsent++
				cases = cases.deactivate(i)//傳送成功,後移該通道
				i--
			}
		}
		if len(cases) == firstSubSendCase {//所有通道傳送完成,退出
			break
		}
		// Select on all the receivers, waiting for them to unblock.
		chosen, recv, _ := reflect.Select(cases)//等待通道返回
		//<-f.removeSub
		if chosen == 0  {
			index := f.sendCases.find(recv.Interface())
			f.sendCases = f.sendCases.delete(index)
			if index >= 0 && index < len(cases) {
				// Shrink 'cases' too because the removed case was still active.
				cases = f.sendCases[:len(cases)-1]
			}
		} else {
			cases = cases.deactivate(chosen)
			nsent++
		}
	}

	// Forget about the sent value and hand off the send lock.
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = reflect.Value{}
	}
	f.sendLock <- struct{}{}//返回時寫入sendLock,為下次傳送做準備
	return nsent
}

send函式使用通道的trySend方法來發送,在正常情況下能夠立即傳送成功,但是當接收通道堵塞的時候,則需要用Select方法這種堵塞的方式等待通道傳送成功。在最後返回時,寫入sendLock,為下次傳送做準備。

4 send函式存在的問題及優化

我們看到send函式使用了sendLock通道,它是一個容量為1的通道。在send函式最開始,讀出sendLock通道,如果這個時候sendLock為空,則send函式就會堵塞。所以在send函式最後,寫入了sendLock通道,這樣下次傳送去讀sendLock時就不會堵塞。看起來好像沒有問題,但是理想很豐滿,顯示有時候會骨感。這裡存在的問題就是chosen, recv, _ := reflect.Select(cases)這行程式碼可能會堵塞,導致for迴圈一值退不出,send函式發生堵塞,導致sendLock不會被寫入。從而導致了死鎖。下次send傳送就會被堵塞。

這裡使用sendLock是為了保護公共的sendCase資料,解決思路是去掉sendCase,不適用全域性的sendCase,而使用區域性變數。這樣就不用考慮同步的問題了。改造後的send函式:

func (f *Feed) Send(value interface{}) (nsent int) {
	rvalue := reflect.ValueOf(value)

	f.once.Do(f.init)
	//<-f.sendLock

	sendCases := caseList{{Chan: reflect.ValueOf(f.removeSub), Dir: reflect.SelectRecv}}
	sendCases = append(sendCases, f.inbox...)

	// Set the sent value on all channels.
	for i := firstSubSendCase; i < len(sendCases); i++ {
		sendCases[i].Send = rvalue
	}

	// Send until all channels except removeSub have been chosen. 'cases' tracks a prefix
	// of sendCases. When a send succeeds, the corresponding case moves to the end of
	// 'cases' and it shrinks by one element.
	cases := sendCases
	//LOOP:
	for {
		// Fast path: try sending without blocking before adding to the select set.
		// This should usually succeed if subscribers are fast enough and have free
		// buffer space.
		for i := firstSubSendCase; i < len(cases); i++ {
			if cases[i].Chan.TrySend(rvalue) {
				nsent++
				cases = cases.deactivate(i)
				i--
			}
		}
		if len(cases) == firstSubSendCase {
			break
		}
		// Select on all the receivers, waiting for them to unblock.
		chosen, recv, _ := reflect.Select(cases)
		//<-f.removeSub
		if chosen == 0  {
			index := f.sendCases.find(recv.Interface())
			f.sendCases = f.sendCases.delete(index)
			if index >= 0 && index < len(cases) {
				// Shrink 'cases' too because the removed case was still active.
				cases = f.sendCases[:len(cases)-1]
			}
		} else {
			cases = cases.deactivate(chosen)
			nsent++
		}
	}

	// Forget about the sent value and hand off the send lock.
	for i := firstSubSendCase; i < len(f.sendCases); i++ {
		f.sendCases[i].Send = reflect.Value{}
	}
	//f.sendLock <- struct{}{}
	return nsent
}

某次send可能會堵塞,但是不會影響下次send傳送。

5 go-ethereum原始碼中使用send的坑

 我們看core/blockchain.go中的傳送函式PostChainEvents():

// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
	log.Info("lzj-log PostChainEvents", "events len",len(events))
	// post event logs for further processing
	if logs != nil {
		bc.logsFeed.Send(logs)
	}
	for _, event := range events {
		switch ev := event.(type) {
		case ChainEvent:
			log.Info("lzj-log send ChainEvent")
			bc.chainFeed.Send(ev)

		case ChainHeadEvent:
			log.Info("lzj-log send ChainHeadEvent")
			bc.chainHeadFeed.Send(ev)

		case ChainSideEvent:
			log.Info("lzj-log send ChainSideEvent")
			bc.chainSideFeed.Send(ev)
		}
	}
}

這個函式是在for迴圈中先後傳送了ChainEvent、ChainHeadEvent和ChainSideEvent事件。在insert函式中呼叫了這個 函式。但是這裡有個問題,如果前一個事件傳送堵塞了,後面的事件傳送就不會執行。需要把Send函式放到單獨的協程中去。改成這樣可以防止堵塞的問題:

// PostChainEvents iterates over the events generated by a chain insertion and
// posts them into the event feed.
// TODO: Should not expose PostChainEvents. The chain events should be posted in WriteBlock.
func (bc *BlockChain) PostChainEvents(events []interface{}, logs []*types.Log) {
	log.Info("lzj-log PostChainEvents", "events len",len(events))
	// post event logs for further processing
	if logs != nil {
		bc.logsFeed.Send(logs)
	}
	for _, event := range events {
		switch ev := event.(type) {
		case ChainEvent:
			log.Info("lzj-log send ChainEvent")
			go bc.chainFeed.Send(ev)

		case ChainHeadEvent:
			log.Info("lzj-log send ChainHeadEvent")
			go bc.chainHeadFeed.Send(ev)

		case ChainSideEvent:
			log.Info("lzj-log send ChainSideEvent")
			go bc.chainSideFeed.Send(ev)
		}
	}
}

在go裡面使用通道要發非常小心,因為很容易引起堵塞從而達不到自己期望的結果。