1. 程式人生 > >一個用Go寫的叢集資料分發工具

一個用Go寫的叢集資料分發工具

背景

在工作中遇到大資料(20G左右)批量部署的問題,剛開始通過scp或者rsync限速80M序列分發,大約每臺機器的耗時為5分鐘,極大的增加了大批量部署的時間和難度,各種難產。於是作者就花了一天多的時間做了個簡易的傳輸工具,現在還不太成熟,希望對大家有啟發,更希望大家多多提建議

思路

將傳輸完成的目標機器轉換成資料來源為其他機器提供資料來源服務。(相當於《行屍走肉》中的殭屍一樣,每一個被感染的個體都是傳染源)

語言選型

對於資料傳輸來說,個人認為語言不是瓶頸而是傳輸的方法(序列、並行還是其他),之所以選擇 go 是因為 go 可以編譯後扔到機器上直接執行(Python需要安裝各種依賴包增加了部署的難度)

依賴

由於公司機器都已安裝FTP服務且已配置限速下載,所以工具本身不提供下載服務,下載限速也是寫死在Client端的。

架構

C/S架構,即大家熟知的Client和Server結構。

Client:等待接受Server下載任務並執行。

Server:負責分發控制客戶端的下載以及接受客戶端的回傳資訊。

效果

第1次傳輸1臺,第2次傳輸2臺,第3次傳輸4臺……第n次傳輸2(n-1)

臺,也就是說傳輸1+2+4=7臺數據的時間相當於序列傳輸3臺機器的時間,並且傳輸的數量越多,相對序列傳輸時間越少。

 

廢話不多說,直接上Server端程式碼。至於程式碼規範性,請大家忽略。程式碼以 GitHub 上為準。

[[email protected] src]# cat p2pserver.go
package main

import (
       "bufio"
       "flag"
       "fmt"
       "github.com/gin-gonic/gin"
       "github.com/parnurzeal/gorequest"
       "io"
       "os"
       "strconv"
       "strings"
       "time"
)

 //定義slice
 var (
       SuccList[]string //成功列表
       FailList[]string //失敗列表
       DstList  []string //目的列表
       //p2pConfig
       ListenPortstring
       Port       *string
       Server     *string
       Srcpath    *string
       Dstpath    *string
       Filename   *string
       Timeout    *int
 )
 var succ_ch = make(chan string, 10000)

 //讀取檔案,返回行 列表
 func Cat(filename string) []string {
       f,err := os.Open(filename)
       defer f.Close()
       if err != nil {     
           panic(err)
       }
       reader:= bufio.NewReader(f)    
       for{
           line,err := reader.ReadString('\n')
           if err == io.EOF {             
               break
           }
           line= strings.TrimSpace(line)              
           if line != "" {
               DstList= append(DstList, line)
           }
       }
       returnDstList
 }
       

 //接收客戶端返回的資訊
 func accept() {
       router:= gin.Default()
       router.GET("/p2p",func(c *gin.Context) {
              status,_ := strconv.ParseBool(c.Query("status")) //判斷客戶端是否下載成功
              host:= c.Query("host")
              src:= c.Query("src") //接受客戶端返回的資料來源
              //fmt.Println(status,host)
              if status {
                     succ_ch<- host
                     succ_ch<- src
                     SuccList= append(SuccList, host)                                     //fmt.Println(SuccList,FailList)
              }else {
                     fmt.Println("客戶端下載失敗:",host)
                     succ_ch<- src
              }
       })
       router.Run(ListenPort)
       //router.Run(":%s",Conf().Port)
 }
 //request向客戶端傳送下載任務
func request(Server, port, src, dst,srcpath, dstpath string) {
       req:= gorequest.New()
       url:= fmt.Sprintf("http://%s:12306/Client?port=%s&src=%s&srcpath=%s&dstpath=%s&Server=%s&localhost=%s",dst, port, src, srcpath, dstpath, Server, dst)
       fmt.Println(url)
       //_,_, errs := req.Get(url).Timeout(10*time.Second).End() //設定url超時時間為10妙
       _, _, errs := req.Get(url).End() //設定url超時時間為10妙
       iflen(errs) != 0 {
              fmt.Println("請求客戶端失敗:",dst)
              succ_ch<- src
       }
 }
 
 //主程式執行分發控制
 func handle(dstlist []string) {
       index:= 0
       lendst:= len(dstlist)
       for{       
           select{
              casesucchost := <-succ_ch:
                     if index < lendst {
                         request(*Server,*Port, succhost, dstlist[index], *Srcpath, *Dstpath)
                         index= index + 1
                     }else {
                        time.Sleep(time.Duration(*Timeout)* time.Second)
                        if lendst != len(SuccList) {    
                           FailList= Set(dstlist, SuccList)
                           fmt.Printf("成功機器數為%d,機器列表為,機器列表為%v\n",len(SuccList), SuccList)
                           fmt.Printf("失敗機器數為%d,機器列表為,機器列表為%v\n",len(FailList), FailList)
                     }else {
                           fmt.Printf("全部成功,成功機器數為%d,機器列表為,機器列表為%v\n",len(SuccList), SuccList)
                            }
                           fmt.Println("==============執行完畢================="
                            os.Exit(0)
                     }
              }
       }
 }
 //初始化
 func init() {    
     //解析commandline引數
       flag.Usage= func() {
              fmt.Println("Usage:<-m host> [-f file] [-s srcpath] [-d dstpath] [-p port] [-ttimeout]")
              flag.PrintDefaults()
       }
       Filename= flag.String("f", "ip.txt", "File that contains thetarget machine")
       Srcpath= flag.String("s", "/home/xiaoju", "Data sourcepath")
       Dstpath= flag.String("d", "/home/xiaoju", "Data destinationpath")
       Port= flag.String("p", "12306", "Listen port")
       Server= flag.String("m", "", "ip or host name of the Server")
       Timeout= flag.Int("t", 1800, "If the Server does not receive the returnvalue within the specified time , that the transmission fails")
       flag.Parse()
       ListenPort= fmt.Sprintf(":%s", *Port)
       succ_ch<- *Server //將Server加入channel
 }
 //兩個slice取差集
 func Set(one, two []string) []string {
       x:= []string{}
       if len(two) != 0 {
            for_, v := range one {
                     fork, vv := range two {
                        ifv == vv {                              
                          break
                        }
                       if k == len(two)-1 {
                            x= append(x, v)
                            }
                     }
              }
       }else {
              returnone
       }
       returnx
 }
 func main() { 
     if*Server == "" {
              flag.Usage()
              os.Exit(2)
       }
       goaccept()
       handle(Cat(*Filename))
}

客戶端程式碼,相關寫死的引數可以自行修改或者作為變數傳入

服務執行完自動退出,主要目的:防止誤操作,切斷伺服器後門。

[[email protected]]# cat p2pClient.go
/*
根據公司背景
客戶端基於ftp下載
下載限速為80M,且不能修改
cut-dirs預設是2,且不能修改
此服務下載完成後會自動退出
*/

package main

import ( 
 "github.com/gin-gonic/gin"
 "github.com/parnurzeal/gorequest"
 "fmt"
 "os/exec"
 "sync"

)

var (
    Url   string
    Status string
    wg     sync.WaitGroup
)

func main() {
   wg.Add(1)
   go accept()
   wg.Wait()
   request := gorequest.New()
   _, _,errs := request.Get(Url).End()
   if len(errs) != 0 {
       fmt.Println("訪問Server失敗")
    }
 }
 
 //接收Server請求
func accept() {
    router := gin.Default()
    router.GET("/Client",func(c *gin.Context){
        src := c.Query("src")
        Server:= c.Query("Server")
        srcpath := c.Query("srcpath")
        dstpath := c.Query("dstpath")
        localhost := c.Query("localhost")
        port := c.Query("port") 
        go  wget(src,srcpath,dstpath,Server,localhost,port)
        c.String(200,"客戶端返回")
   })
    router.Run(":12306")
}
 
 func wget(src,srcpath,dstpath,Server,localhost,portstring) {
   defer wg.Done()
   download := fmt.Sprintf("wget -m -r -nH -P %s --limit-rate=80m--cut-dirs=2 ftp://%s%s ",dstpath,src,srcpath)
   fmt.Println(download)
   _,err :=exec.Command("sh","-c",download).CombinedOutput()
   if err != nil {
        Status = "false"
    }else {
        Status = "true"
    }
   fmt.Println(src,Server,dstpath,localhost)
   Url =fmt.Sprintf("http://%s:%s/p2p?host=%s&status=%s&src=%s",Server,port,localhost,Status,src)

}

PS0至於埠號是徵用以前同事的,好記並且用的人少。相關問題歡迎留言

PS1完整程式碼請關注 https://github.com/51reboot/spore

作者為51Reboot架構師班第4期優秀學員,版權歸作者所有

僅授權51Reboot公眾號、部落格等釋出

轉載請聯絡作者獲得授權,並請註明出處