1. 程式人生 > >InfluxDB原始碼編譯、安裝、配置及主從同步實現

InfluxDB原始碼編譯、安裝、配置及主從同步實現

先扯點蛋

公司有個專案要求使用InfluxDB時序資料庫儲存點東西。第一次聽說還有這種資料庫,哈哈哈,孤陋寡聞了,先從各位大佬的部落格看起,慢慢學習,逐漸瞭解了之後在伺服器上進行安裝。直接使用官方包進行安裝很簡單,檢視官方說明即可。安裝之後使用才發現,開源的只支援單機版的,但是公司用不能這麼low吧,怎麼也要“高可用”一點,於是自己參考MySQL主從複製原理、半同步操作步驟及原理餓了麼 Influxdb 實踐之路,做了InfluxDB主從系統。


客戶端系統

客戶端系統拓撲圖
InfluxDB主從同步拓撲圖

這個系統主要是用來從kafka獲取資料來源,經過處理之後存到InfluxDB中去,這裡參考了「餓了麼」那篇文章,但是我看過他們的原始碼,基本上搞不懂,就根據文章的描述搞了一個簡陋版的東西吧,這裡涉及到kafka和influxdb-java,專案原始碼可以看

這裡。該專案啟動執行參考其中的README。


InfluxDB主從同步系統

InfluxDB同步系統
InfluxDB同步系統

主從同步架構,是簡陋版的MySQL主從同步。指令碼讀取InfluxDB的增刪改操作日誌,使用指令碼將記錄寫入從機中。經過測試,執行狀況良好,只要主從機不掛,同步系統可以健康執行。

同步指令碼使用Python編寫,專案原始碼在這裡。該專案啟動執行參考其中的README。

為什麼使用Python指令碼來編寫主從同步程式碼?

  • 主要是考慮到降低與資料庫的程式碼耦合程度,InfluxDB原始碼如果出現大規模的升級改動,同步指令碼只需略作修改就可以了。
  • 畢竟Python是世界上**的語言。?

InfluxDB原始碼修改

  • 下載原始碼

    在github上下載InfluxDB原始碼(這裡使用的是我的fork地址,已經修改好的原始碼)。

  • 修改原始碼

    由於指令碼需要讀取InfluxDB的增刪改操作日誌,需要對原始碼中這一部分操作的日誌進行修改,方便指令碼解析日誌。官方版的日誌不記錄寫入操作的資料值,所以需要我們自己修改原始碼中對應的記錄寫日誌的地方。

    修改檔案influxdb/services/httpd/handler.go

    // 記錄日誌的具體方法
    func buildLogLine(l *responseLogger, r *http.Request
    , start time.Time, body string) string { redactPassword(r) username := parseUsername(r) host, _, err := net.SplitHostPort(r.RemoteAddr) if err != nil { host = r.RemoteAddr } if xff := r.Header["X-Forwarded-For"]; xff != nil { addrs := append(xff, host) host = strings.Join(addrs, ",") } uri := r.URL.RequestURI() referer := r.Referer() userAgent := r.UserAgent() // 新增請求中的請求路徑 path := r.URL.Path // 新增請求中的form值 r.ParseForm() form := r.Form // 新增請求中的body的值 newbody := strings.Replace(body, "\n", ";", -1) // 將日誌記錄變為json格式 return fmt.Sprintf(`{"timeindex":%d,"host":"%s","username":"%s","method":"%s","path":"%s","uri":"%s","form":"%s","body":"%s","proto":"%s","status":"%s","size":"%s","referer":"%s","agent":"%s","reqId":"%s"}`, start.Nanosecond(), host, detect(username, "-"), r.Method, path, uri, form, newbody, r.Proto, detect(strconv.Itoa(l.Status()), "-"), strconv.Itoa(l.Size()), detect(referer, "-"), detect(userAgent, "-"), r.Header.Get("Request-Id")) }

    修改檔案influxdb/services/httpd/handler.go其中兩處呼叫上面修改過的函式buildLogLine的地方

    func (h *Handler) logging(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
            inner.ServeHTTP(l, r)
            // 增加請求中的body
            h.CLFLogger.Println(buildLogLine(l, r, start, h.body))
    
            // Log server errors.
            if l.Status()/100 == 5 {
                errStr := l.Header().Get("X-InfluxDB-Error")
                if errStr != "" {
                    h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr))
                }
            }
        })
    }
    func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
    
            defer func() {
                if err := recover(); err != nil {
                    // 增加請求中的body
                    logLine := buildLogLine(l, r, start, h.body)
                    logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())
                    h.CLFLogger.Println(logLine)
                    http.Error(w, http.StatusText(http.StatusInternalServerError), 500)
                    atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.
    
                    if willCrash {
                        h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:")
                        buf := debug.Stack()
                        h.CLFLogger.Printf("%s\n", buf)
                        os.Exit(1) // If we panic then the Go server will recover.
                    }
                }
            }()
    
            inner.ServeHTTP(l, r)
        })
    }

    到此InfluxDB原始碼修改完成。

    為什麼要在func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string方法上增加一個欄位body,而body明明是從h *Handler中獲取的,而函式buildLogLine中明明是有r *http.Request的?

    因為經過測試,在func buildLogLine中從r *http.Request裡面實際上取不到body的值,我沒有深入研究過這段程式碼,確實不懂為什麼會這樣。所以才從上層將body的值直接傳入這個方法中。

編譯原始碼和執行

使用原始碼構建時序資料庫主從同步系統

系統要求

Linux 64位系統即可。

依賴環境

本系統使用InfluxDB v1.5.2,所以需要Go 1.9.2或者以上的版本。

InfluxDB使用Dep管理依賴包,需要安裝dep

主從同步指令碼使用Python 2.7開發。

需要從Github上拉取原始碼,環境中需要安裝git

安裝InfluxDB

新建目錄$YOUR_PATH/gocodez/src$YOUR_PATH/gocodez/src$YOUR_PATH為自定義目錄:

mkdir -p $YOUR_PATH/gocodez/src
mkdir -p $YOUR_PATH/gocodez/bin

將原始碼下載解壓到目錄$YOUR_PATH/gocodez/src中:

cd $YOUR_PATH/gocodez/src/
git clone https://github.com/callELPSYCONGROO/influxdb.git

將目錄$YOUR_PATH/設定為GOPATH

export GOPATH=$YOUR_PATH/

進入目錄中:

cd $YOUR_PATH/gocodez/src/influxdb

如果正確安裝了dep,這使用這個命令安裝依賴:

dep ensure

安裝依賴時,如果遇到無法連線到依賴所需的伺服器,需要檢視所缺依賴,手動將這些依賴原始碼下載到$YOUR_PATH/src/對應路徑下。所需依賴在Github上均有原始碼可以下載。

如果安裝或使用dep不成功,可以跳過此步,在下一步時根據錯誤提示,手動安裝所需依賴。

安裝依賴可以使用依賴包,將依賴包的/src解壓到$YOUR_PATH/gocodez目錄下即可。

構建安裝二進位制執行碼:

go clean ./...
go install ./...

安裝完成後,二進位制執行碼放在$YOUR_PATH/gocodez/bin/中,啟動時序資料庫:

$YOUR_PATH/gocodez/bin/influxd

安裝主從同步指令碼

新建目錄$YOUR_SCRIPT$YOUR_SCRIPT為自定義指令碼目錄:

mkdir $YOUR_SCRIPT

從Github中拉取指令碼:

cd $YOUR_SCRIPT
git clone https://github.com/callELPSYCONGROO/master_slave

參考其中的README.md完成指令碼配置和執行。