1. 程式人生 > >Go1.9按行讀取日誌檔案並處理

Go1.9按行讀取日誌檔案並處理

package main

import (
    "bufio"
    "bytes"
    "context"
    "log"
    "os"
    "sync"
)

const (
    logname    = "log"
    concurrent = 5                //併發處理數,可以根據實體記憶體調整
    maxsize    = 10 * 1024 * 1024 //每次讀取的大小,可以根據實體記憶體調整
)

var bufs = make([][]byte, concurrent)

func main() {
    var (
        chs         = make
(map[int]chan int) wait = new(sync.WaitGroup) chct = make(chan int, concurrent) ctx, cancel = context.WithCancel(context.Background()) ) File, err := os.Open(logname) if err != nil { log.Fatalf("Open file error,%s\n", err.Error()) } for
i := 0; i < concurrent; i++ { chct <- i chs[i] = make(chan int) bufs[i] = make([]byte, maxsize) go resolvectx(ctx, wait, i, chct, chs[i]) } var i, n, l int for i = range chct { n, err = File.Read(bufs[i]) if err != nil { wait.Wait() //等待資料全部處理完畢,然後返回
break } for s := 1; s < n; s++ { //如果行過長,那麼效率會變低 if bufs[i][n-s] == '\n' { n = n - s + 1 File.Seek(int64(l+n), 0) break } } l += n wait.Add(1) chs[i] <- n } cancel() close(chct) File.Close() } func resolvectx(ctx context.Context, wait *sync.WaitGroup, index int, chct, ch chan int) { var ( err error line []byte length int buf = bufio.NewReader(nil) ) for { select { case <-ctx.Done(): return case length = <-ch: buf.Reset(bytes.NewBuffer(bufs[index][:length])) for { line, _, err = buf.ReadLine() if err != nil { break } _ = line } chct <- index wait.Done() } } }