1. 程式人生 > >beehive 原始碼閱讀- go 語言的自動化機器

beehive 原始碼閱讀- go 語言的自動化機器

Overview

看一下作者本人的註釋

 

// Package bees is Beehive's central module system.

 

beehive 非常有趣的在於各邏輯的解耦設計,這不僅讓本身功能操作簡單,也讓擴充套件變得關注點少了很多,只需要一點學習成本就可以擴充套件自己的 beehive

 

首先解釋一下 bee hive 中 的概念

 

bee 代表的是我們常見的 Worker 也就是說,實際的行為是由這些 小蜜蜂執行的。他們就類似於採蜜的工人,採集到了之後統一放回來處理

 

hive 是蜂房,也就是我們常見的 WorkerPool 不同的是,她更像一個 Facotry ,什麼意思呢?她可以建立專屬的 bee 。在極少的配置下,比如只需要配置上一個 token 即可。就可以生成一隻 bee 專門針對某一種蜜工作了。

 

chain 又是什麼? chain 就是連結事件與處理的工具,我們認為 bee 採回蜜是一個事件,總不可能採回來啥都不幹吧。針對不同的 蜜 我們就有不同的反應,就有不同的 action

比如某人的 blog 更新了 ,rss bee 接收到了之後飛回來,我們就可以再要求 email bee 把這其中的資訊通過郵件發給我們或者我們想發給的人。

這就需要 chain 來聯絡 event 和 action 了

Landscape

 

API Register

 

beehive-api (1).svg

成組的 API 實現了 Resource 介面註冊到 Container 的路由 Route 中。

 

幫助理解

在檔案中寫好了 作者 實現的 Handle 的實現來註冊 http 請求

image.png

 

API 只是供呼叫,邏輯重點在 bees 這個包裡的實現。

 

Bee

首先是有的介面

// BeeInterface is an interface all bees implement.
type BeeInterface interface {
  // Name of the bee
  Name() string
  // Namespace of the bee
  Namespace() string

  // Description of the bee
  Description() string
  // SetDescription sets a description
  SetDescription(s string)

  // Config returns this bees config
  Config() BeeConfig
  // Options of the bee
  Options() BeeOptions
  // SetOptions to configure the bee
  SetOptions(options BeeOptions)

  // ReloadOptions gets called after a bee's options get updated
  ReloadOptions(options BeeOptions)

  // Activates the bee
  Run(eventChannel chan Event)
  // Running returns the current state of the bee
  IsRunning() bool
  // Start the bee
  Start()
  // Stop the bee
  Stop()

  LastEvent() time.Time
  LogEvent()
  LastAction() time.Time
  LogAction()

  Logln(args ...interface{})
  Logf(format string, args ...interface{})
  LogErrorf(format string, args ...interface{})
  LogFatal(args ...interface{})

  SetSigChan(c chan bool)
  WaitGroup() *sync.WaitGroup

  // Handles an action
  Action(action Action) []Placeholder
}

 

和他的基礎實現

 

// Bee is the base-struct to be embedded by bee implementations.
type Bee struct {
  config BeeConfig

  lastEvent  time.Time
  lastAction time.Time

  Running   bool
  SigChan   chan bool
  waitGroup *sync.WaitGroup
}

 

這裡需要注意的是 Run 介面,在 Base-Struct Bee 中該方法 是空的實現,因為 Run 是 Bee 的生命週期開始處,是自動開始的。

 

WebBee 實現

簡單的看某一個實現即可

 

// WebBee is a Bee that starts an HTTP server and fires events for incoming
// requests.
type WebBee struct {
  bees.Bee

  addr string

  eventChan chan bees.Event
}

 

可以很清楚的指導,這個 WebBee 中的 eventChan 正是通知的地方,也就是上文所說的 Chain 的開始處。注意的是由於鬆耦合的設計,任何 Bee 都可以成為 Chain 上的一環,只要它能觸發事件。或者監聽事件。

 

func (mod *WebBee) Run(cin chan bees.Event)

 

// Run executes the Bee's event loop.
func (mod *WebBee) Run(cin chan bees.Event) {
  mod.eventChan = cin

  srv := &http.Server{Addr: mod.addr, Handler: mod}
  l, err := net.Listen("tcp", mod.addr)
  if err != nil {
    mod.LogErrorf("Can't listen on %s", mod.addr)
    return
  }
  defer l.Close()

  go func() {
    err := srv.Serve(l)
    if err != nil {
      mod.LogErrorf("Server error: %v", err)
    }
    // Go 1.8+: srv.Close()
  }()

  select {
  case <-mod.SigChan:
    return
  }
}

 

同時 WebBee 也有一個方法 ServeHTTP 來實現 http.Handle 來處理請求。

這裡也就是前文所說的 註冊的那些 API 的部分來源,每一個 bee 自身實現的自動註冊暴露給外界呼叫。

func (mod *WebBee) ServeHTTP(w http.ResponseWriter, req *http.Request)

 

Event

package:beehive/bees/event.go

剛才講到了 觸發事件 event 的 WebBee 實現,現在我們來看 event 的實現

 

實際上是通過 這個函式實現的

 

// handleEvents handles incoming events and executes matching Chains.
func handleEvents() {
  for {
    event, ok := <-eventsIn
    ···
    bee := GetBee(event.Bee)
    (*bee).LogEvent()

    ···
    go func() {
      defer func() {
        if e := recover(); e != nil {
          log.Printf("Fatal chain event: %s %s", e, debug.Stack())
        }
      }()

      execChains(&event)
    }()
  }
}

省略了 日誌部分。可以看到 handleEvents 通過接受通道里的 event,並檢查 event 中的 Bee 作為 標誌找到對應的 Bee 喚醒。

 

這裡我們可以看到 最後進入了 Chains 中執行,即上文所說的 Chain 將 Event 和 Action 連結了起來,讓 Bee 之間能夠協作。

 

chain

package:beehive/bees/chains.go

chain 中實際上是呼叫 Actions 通過下面的 execActions 函式

for _, el := range c.Actions {
      action := GetAction(el)
      if action == nil {
        log.Println("\t\tERROR: Unknown action referenced!")
        continue
      }
      execAction(*action, m)
    }

 

我們來看看 Action 的執行。

 

Action Exec

package: beehive/bees/actions.go

actions 既可以執行設定中的 options 也可以直接在 執行函式中傳入需要執行的 options

func execAction(action Action, opts map[string]interface{}) bool

 

bee-actions (1).svg

 

Summary

整個執行邏輯是如此了,其他還有一些 

  • 日誌處理:用於跟蹤 bee 和 hive 的情況
  • Config:儲存配置檔案,每一次啟動可以重新放飛以前的 Bee 們
  • Signal:beehive 攔截了一些 Signal 的 Kill 等訊號來執行優雅退出,避免了 Config 等的丟失。
  • Run Flag:執行的附帶引數,設定 Beehive 整個應用的監聽埠和