1. 程式人生 > >golang基礎-beego讀取配置_輸出log日誌、tailf元件讀取log、配置zookeeper_kafka、傳送log至kafka

golang基礎-beego讀取配置_輸出log日誌、tailf元件讀取log、配置zookeeper_kafka、傳送log至kafka

在前面3篇博文中已經學習了

今天我們來整合這些demo,寫一個log日誌收集傳送kafka的小專案

專案的流程框架圖

這裡寫圖片描述

專案的結構圖:
這裡寫圖片描述

1、載入配置檔案loadConf,封裝結構體

[logs]
log_level=debug
log_path=E:\golang\go_pro\logs\logagent.log

[collect]
log_path=E:\golang\go_pro\logs\logagent.log
topic=nginx_log
chan_size=100

[kafka]
server_addr=192.168.21.8:9092

簡單說下配置資訊的作用
[logs]是log輸出級別,以及log輸出的檔案地址路徑
[collect]是要讀取的log日誌地址,然後利用topic,啟動goroutine傳送給kafka
[kafka]kafka關聯的ip埠

我們將配置資訊封裝成結構體,然後在定義一個全域性變數來進行使用

var (
    appConfig *Config
)

type Config struct {
    logLevel string
    logPath string

    chanSize int
    kafkaAddr string
    collectConf []tailf.CollectConf
}

在結構體中collectConf 是一個數組,因為我們傳送kafka時候,可能是多個不同路徑+topic(此例我們只用了一個)

type CollectConf struct {
    LogPath
string Topic string }

2、初始化beego的log元件

func initLogger()(err error) {

    config := make(map[string]interface{})
    config["filename"] = appConfig.logPath
    config["level"] = convertLogLevel(appConfig.logLevel)

    configStr, err := json.Marshal(config)
    if err != nil {
        fmt.Println("initLogger failed, marshal err:"
, err) return } logs.SetLogger(logs.AdapterFile, string(configStr)) //{"filename":"E:\\golang\\go_pro\\logs\\logagent.log","level":7} fmt.Println(string(configStr)) return }

3、初始化tailf

在初始化goroutine模組,輸出log日誌,我們需要設計幾個結構體
在初始化配置資訊中提到了結構體Config,裡面的 collectConf []tailf.CollectConf
我們在封裝如下2個結構體

TailObj 結構體是利用tail.Lines讀取CollectConf路徑下的資訊

type TailObj struct {
    tail *tail.Tail
    conf CollectConf
}

TailObjMgr 結構體是tail.Lines讀取CollectConf路徑下的資訊時候, 存放到chan管道中,tailObjs 這可能是多個不同路徑+topic(此例我們只用了一個)

type TailObjMgr struct {
    tailObjs []*TailObj
    msgChan chan *TextMsg
}

然後將tailf初始化的操作貼出來

func InitTail(conf []CollectConf, chanSize int) (err error) {

    if len(conf) == 0 {
        err = fmt.Errorf("invalid config for log collect, conf:%v", conf)
        return
    }

    tailObjMgr = &TailObjMgr{
        msgChan: make(chan*TextMsg, chanSize),
    }
    ////appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
    for _, v := range conf {
        obj := &TailObj{
            conf: v,
        }
        //v--- {E:\golang\go_pro\logs\logagent.log nginx_log}
        fmt.Println("v---",v)
        tails, errTail := tail.TailFile(v.LogPath, tail.Config{
            ReOpen:    true,
            Follow:    true,
            //Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
            MustExist: false,   
            Poll:      true,
        })

        if errTail != nil {
            err = errTail
            return
        }

        obj.tail = tails
        tailObjMgr.tailObjs = append(tailObjMgr.tailObjs, obj)

        go readFromTail(obj)
    }

    return
}

func readFromTail(tailObj *TailObj) {
    for true {
        line, ok := <-tailObj.tail.Lines
        if !ok {
            logs.Warn("tail file close reopen, filename:%s\n", tailObj.tail.Filename)
            time.Sleep(100 * time.Millisecond)
            continue
        }

        textMsg := &TextMsg{
            Msg:line.Text,
            Topic: tailObj.conf.Topic,
        }

        tailObjMgr.msgChan <- textMsg
    }
}

4、初始化kafka

/*初始化kafka*/
func InitKafka(addr string) (err error){

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client, err = sarama.NewSyncProducer([]string{addr}, config)
    if err != nil {
        logs.Error("init kafka producer failed, err:", err)  
        return
    }
    //記錄步驟資訊
    logs.Debug("init kafka succ")
    return
}

以前寫過關於kafka的例子,就不再詳細介紹了

5、tailf讀取

從管道中讀取即可

    //從chan中取出
        msg := tailf.GetOneLine()
        fmt.Println(msg)
func GetOneLine()(msg *TextMsg) {
    msg = <- tailObjMgr.msgChan
    return
}

利用fmt.Println(msg)進行測試,我同時輸出到控制檯上

6、傳送資料kafka

func SendToKafka(data, topic string)(err error) {

    msg := &sarama.ProducerMessage{}
    msg.Topic = topic
    msg.Value = sarama.StringEncoder(data)

    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        logs.Error("send message failed, err:%v data:%v topic:%v", err, data, topic)
        return
    }

    logs.Debug("send succ, pid:%v offset:%v, topic:%v\n", pid, offset, topic)
    return
}

傳送kafka的操作也是寫過demo例子的,這裡就不在詳細介紹了

7、啟動zookeeper,kafka測試

這裡寫圖片描述
這裡寫圖片描述

8、檢視測試效果

終端輸出+log日誌輸出目錄
這裡寫圖片描述

以下是輸出到kafka的效果圖

這裡寫圖片描述

程式碼區

main.go

package main

import(
    "fmt"
    "github.com/astaxie/beego/logs"
    "logagent/kafka"
    "logagent/tailf"
    // "time"
)

func main() {
    /*
    載入配置檔案logagent.conf資訊
    */
    filename := "E:/golang/go_pro/logagent.conf"
    err := loadConf("ini", filename)
    if err != nil {
        fmt.Printf("load conf failed, err:%v\n", err)
        panic("load conf failed")
        return
    }

    /*
    初始化beego/logs的一些功能,設定輸出目錄
    */
    err = initLogger()
    if err != nil {
        fmt.Printf("load logger failed, err:%v\n", err)
        panic("load logger failed")
        return
    }

    /*先測試將log輸出配置正確,輸出到logagent.log中*/
    logs.Debug("load conf succ, config:%v", appConfig)

    /*初始化tailf日誌元件 */
    //appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
    fmt.Println("appConfig.collectConf",appConfig.collectConf)
    err = tailf.InitTail(appConfig.collectConf, appConfig.chanSize)
    if err != nil {
        logs.Error("init tail failed, err:%v", err)
        return
    }
    /*先測試將tailf配置正確,輸出到logagent.log中*/
    logs.Debug("initialize tailf succ")

    /*初始kafka的工作*/
    err = kafka.InitKafka(appConfig.kafkaAddr)
    if err != nil {
        logs.Error("init tail failed, err:%v", err)
        return
    }

    logs.Debug("initialize all succ")

    err = serverRun()
    if err != nil {
        logs.Error("serverRUn failed, err:%v", err)
        return
    }

    logs.Info("program exited")
}

config.go

package main


import(
    "fmt"
    "errors"
    "github.com/astaxie/beego/config"
    "logagent/tailf"
)

var (
    appConfig *Config
)

type Config struct {
    logLevel string
    logPath string

    chanSize int
    kafkaAddr string
    collectConf []tailf.CollectConf
}

func loadCollectConf(conf config.Configer) (err error ) {

    var cc tailf.CollectConf
    cc.LogPath = conf.String("collect::log_path")
    if len(cc.LogPath) == 0 {
        err = errors.New("invalid collect::log_path")
        return
    }

    cc.Topic = conf.String("collect::topic")
    if len(cc.LogPath) == 0 {
        err = errors.New("invalid collect::topic")
        return
    }

    appConfig.collectConf = append(appConfig.collectConf, cc)
    return
}

/*
    載入配置檔案資訊
    [logs]
    log_level=debug
    log_path=E:\golang\go_pro\logs\logagent.log
    [collect]
    log_path=E:\golang\go_pro\logs\logagent.log
    topic=nginx_log

    chan_size=100
    [kafka]
    server_addr=192.168.21.8:9092
*/
func loadConf(confType, filename string) (err error) {

    conf, err := config.NewConfig(confType, filename)
    if err != nil {
        fmt.Println("new config failed, err:", err)
        return
    }
    /*定義一個全域性變數儲存
    var appConfig *Config
    */
    appConfig = &Config{}
    appConfig.logLevel = conf.String("logs::log_level")
    if len(appConfig.logLevel) == 0 {
        appConfig.logLevel = "debug"
    }

    appConfig.logPath = conf.String("logs::log_path")
    if len(appConfig.logPath) == 0 {
        appConfig.logPath = "E:\\golang\\go_pro\\logs\\logagent.log"
    }

    appConfig.chanSize, err = conf.Int("collect::chan_size")
    if err != nil {
        appConfig.chanSize = 100
    }

    appConfig.kafkaAddr = conf.String("kafka::server_addr")
    if len(appConfig.kafkaAddr) == 0 {
        err = fmt.Errorf("invalid kafka addr")
        return
    }

    err = loadCollectConf(conf)
    if err != nil {
        fmt.Printf("load collect conf failed, err:%v\n", err)
        return
    }
    return 
}

log.go

package main


import (
    "encoding/json"
    "fmt"
    "github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

    switch (level) {
        case "debug":
            return logs.LevelDebug
        case "warn":
            return logs.LevelWarn
        case "info":
            return logs.LevelInfo
        case "trace":
            return logs.LevelTrace
    }

    return  logs.LevelDebug
}

/*
    初始化beego/logs的一些功能,設定輸出目錄
*/
func initLogger()(err error) {

    config := make(map[string]interface{})
    config["filename"] = appConfig.logPath
    config["level"] = convertLogLevel(appConfig.logLevel)

    configStr, err := json.Marshal(config)
    if err != nil {
        fmt.Println("initLogger failed, marshal err:", err)
        return
    }

    logs.SetLogger(logs.AdapterFile, string(configStr))
    //{"filename":"E:\\golang\\go_pro\\logs\\logagent.log","level":7}
    fmt.Println(string(configStr))
    return
}

tailf.go

package tailf

import (
    "github.com/hpcloud/tail"
    "github.com/astaxie/beego/logs"
    "fmt"
    "time"
)

type CollectConf struct {
    LogPath string
    Topic   string
}
/*{E:\golang\go_pro\logs\logagent.log nginx_log}
每條配置
*/
type TailObj struct {
    tail *tail.Tail
    conf CollectConf
}

type TextMsg struct {
    Msg string
    Topic string
}

type TailObjMgr struct {
    tailObjs []*TailObj
    msgChan chan *TextMsg
}

var (
    tailObjMgr* TailObjMgr
)

func GetOneLine()(msg *TextMsg) {
    msg = <- tailObjMgr.msgChan
    return
}
/*初始化Tail元件一些功能*/
func InitTail(conf []CollectConf, chanSize int) (err error) {

    if len(conf) == 0 {
        err = fmt.Errorf("invalid config for log collect, conf:%v", conf)
        return
    }

    tailObjMgr = &TailObjMgr{
        msgChan: make(chan*TextMsg, chanSize),
    }
    ////appConfig.collectConf [{E:\golang\go_pro\logs\logagent.log nginx_log}]
    for _, v := range conf {
        obj := &TailObj{
            conf: v,
        }
        //v--- {E:\golang\go_pro\logs\logagent.log nginx_log}
        fmt.Println("v---",v)
        tails, errTail := tail.TailFile(v.LogPath, tail.Config{
            ReOpen:    true,
            Follow:    true,
            //Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
            MustExist: false,   
            Poll:      true,
        })

        if errTail != nil {
            err = errTail
            return
        }

        obj.tail = tails
        tailObjMgr.tailObjs = append(tailObjMgr.tailObjs, obj)

        go readFromTail(obj)
    }

    return
}

func readFromTail(tailObj *TailObj) {
    for true {
        line, ok := <-tailObj.tail.Lines
        if !ok {
            logs.Warn("tail file close reopen, filename:%s\n", tailObj.tail.Filename)
            time.Sleep(100 * time.Millisecond)
            continue
        }

        textMsg := &TextMsg{
            Msg:line.Text,
            Topic: tailObj.conf.Topic,
        }

        tailObjMgr.msgChan <- textMsg
    }
}

kafka.go

package kafka

import(
    "github.com/Shopify/sarama"
    "github.com/astaxie/beego/logs"
)

var (
    client sarama.SyncProducer 
)

/*初始化kafka*/
func InitKafka(addr string) (err error){

    config := sarama.NewConfig()
    config.Producer.RequiredAcks = sarama.WaitForAll
    config.Producer.Partitioner = sarama.NewRandomPartitioner
    config.Producer.Return.Successes = true

    client, err = sarama.NewSyncProducer([]string{addr}, config)
    if err != nil {
        logs.Error("init kafka producer failed, err:", err)  
        return
    }
    //記錄步驟資訊
    logs.Debug("init kafka succ")
    return
}

/*
    傳送到kafak
*/
func SendToKafka(data, topic string)(err error) {

    msg := &sarama.ProducerMessage{}
    msg.Topic = topic
    msg.Value = sarama.StringEncoder(data)

    pid, offset, err := client.SendMessage(msg)
    if err != nil {
        logs.Error("send message failed, err:%v data:%v topic:%v", err, data, topic)
        return
    }

    logs.Debug("send succ, pid:%v offset:%v, topic:%v\n", pid, offset, topic)
    return
}

server.go

package main
import(
    "logagent/tailf"
    "logagent/kafka"
    "github.com/astaxie/beego/logs"
    "time"
    "fmt"
)


func serverRun() (err error){
    for {
        //從chan中取出
        msg := tailf.GetOneLine()
        fmt.Println(msg)

        err = kafka.SendToKafka(msg.Msg, msg.Topic)

        if err != nil {
            logs.Error("send to kafka failed, err:%v", err)
            time.Sleep(time.Second)
            continue
        }
    }
    return
}

logagent.conf

[logs]
log_level=debug
log_path=E:\golang\go_pro\logs\logagent.log

[collect]
log_path=E:\golang\go_pro\logs\logagent.log
topic=nginx_log
chan_size=100

[kafka]
server_addr=192.168.21.8:9092