1. 程式人生 > >使用Go語言編寫區塊鏈P2P網路(譯)

使用Go語言編寫區塊鏈P2P網路(譯)

在之前的文章中,我們已經知道了怎麼編寫PoW也知道了IPFS怎麼工作, 但是有一個致命的缺點,我們的服務都是中心化的,這篇文章會教你怎麼實現一個簡單的完全去中心化的P2P網路。

背景知識

什麼是P2P網路

在真正的P2P架構中,不需要中心化的服務來維護區塊鏈的狀態。例如,當你給朋友傳送比特幣時,比特幣區塊鏈的“狀態”應該更新,這樣你朋友的餘額就會增加,你的餘額就會減少。

在這個網路中,不存在一個權力高度中心化的機構來維護狀態(銀行就是這樣的中心化機構)。對於比特幣網路來說,每個節點都會維護一份完整的區塊鏈狀態,當交易發生時,每個節點的區塊鏈狀態都會得到更新。這樣,只要網路中51%的節點對區塊鏈的狀態達成一致,那麼區塊鏈網路就是安全可靠的,具體可以閱讀這篇一致性協議

文章

本文將繼續之前的工作,200行Go程式碼實現區塊鏈, 並加入P2P網路架構。在繼續之前,強烈建議你先閱讀該篇文章,它會幫助你理解接下來的程式碼。

開始實現

編寫P2P網路可不是開開玩笑就能簡單視線的,有很多邊邊角角的情況都要覆蓋到,而且需要你擁有很多工程學的知識,這樣的P2P網路才是可擴充套件、高可靠的。有句諺語說得好:站在巨人肩膀上做事,那麼我們先看看巨人們提供了哪些工具吧。

喔,看看,我們發現了什麼!一個用Go語言實現的P2P庫go-libp2p!如果你對新技術足夠敏銳,就會發現這個庫的作者和IPFS的作者是同一個團隊。如果你還沒看過我們的IPFS教程,可以看看這裡, 你可以選擇跳過IPFS教程,因為對於本文這不是必須的。

警告

目前來說,go-libp2p主要有兩個缺點:
1. 安裝設定比較痛苦,它使用gx作為包管理工具,怎麼說呢,不咋好用,但是湊活用吧
2. 目前專案還沒有成熟,正在緊密鑼鼓的開發中,當使用這個庫時,可能會遇到一些資料競爭(data race)

對於第一點,不必擔心,有我們呢。第二點是比較大的問題,但是不會影響我們的程式碼。假如你在使用過程中發現了資料競爭問題,記得給專案提一個issue,幫助它更好的成長!

總之,目前開源世界中,現代化的P2P庫是非常非常少的,因為我們要多給go-libp2p一些耐心和包容,而且就目前來說,它已經能很好的滿足我們的目標了。

安裝設定

最好的環境設定方式是直接clone libp2p

庫,然後在這個庫的程式碼中直接開發。你也可以在自己的庫中,呼叫這個庫開發,但是這樣就需要用到gx了。這裡我們使用簡單的方式,假設你已經安裝了Go:
- go get -d github.com/libp2p/go-libp2p/…
- 進入go-libp2p資料夾
- make
- make deps

這裡會通過gx包管理工具下載所有需要的包和依賴,再次申明,我們不喜歡gx,因為它打破了Go語言的很多慣例,但是為了這個很棒的庫,認慫吧。

這裡,我們在examples子目錄下進行開發,因此在go-libp2p的examples下建立一個你自己的目錄
- mkdir ./examples/p2p

然後進入到p2p資料夾下,建立main.go檔案,後面所有的程式碼都會在該檔案中。

你的目錄結構是這樣的:

好了,勇士們,拔出你們的劍,哦不,拔出你們的main.go,開始我們的征途吧!

匯入相關庫

這裡申明我們需要用的庫,大部分庫是來自於go-libp2p本身的,在教程中,你會學到怎麼去使用它們。

package main

import (
    "bufio"
    "context"
    "crypto/rand"
    "crypto/sha256"
    "encoding/hex"
    "encoding/json"
    "flag"
    "fmt"
    "io"
    "log"
    mrand "math/rand"
    "os"
    "strconv"
    "strings"
    "sync"
    "time"

    "github.com/davecgh/go-spew/spew"
    golog "github.com/ipfs/go-log"
    libp2p "github.com/libp2p/go-libp2p"
    crypto "github.com/libp2p/go-libp2p-crypto"
    host "github.com/libp2p/go-libp2p-host"
    net "github.com/libp2p/go-libp2p-net"
    peer "github.com/libp2p/go-libp2p-peer"
    pstore "github.com/libp2p/go-libp2p-peerstore"
    ma "github.com/multiformats/go-multiaddr"
    gologging "github.com/whyrusleeping/go-logging"
)

spew包可以很方便、優美的打印出我們的區塊鏈,因此記得安裝它:
- go get github.com/davecgh/go-spew/spew

區塊鏈結構

記住,請先閱讀200行Go程式碼實現區塊鏈, 這樣,下面的部分就會簡單很多。

先來申明全域性變數:

// Block represents each 'item' in the blockchain
type Block struct {
    Index     int
    Timestamp string
    BPM       int
    Hash      string
    PrevHash  string
}

// Blockchain is a series of validated Blocks
var Blockchain []Block

var mutex = &sync.Mutex{}
  • 我們是一家健康看護公司,因此Block中存著的是使用者的脈搏速率BPM
  • Blockchain是我們的”狀態”,或者嚴格的說:最新的Blockchain,它其實就是Block的切片(slice)
  • mutex是為了防止資源競爭出現

下面是Blockchain相關的特定函式:

// make sure block is valid by checking index, and comparing the hash of the previous block
func isBlockValid(newBlock, oldBlock Block) bool {
    if oldBlock.Index+1 != newBlock.Index {
        return false
    }

    if oldBlock.Hash != newBlock.PrevHash {
        return false
    }

    if calculateHash(newBlock) != newBlock.Hash {
        return false
    }

    return true
}

// SHA256 hashing
func calculateHash(block Block) string {
    record := strconv.Itoa(block.Index) + block.Timestamp + strconv.Itoa(block.BPM) + block.PrevHash
    h := sha256.New()
    h.Write([]byte(record))
    hashed := h.Sum(nil)
    return hex.EncodeToString(hashed)
}

// create a new block using previous block's hash
func generateBlock(oldBlock Block, BPM int) Block {

    var newBlock Block

    t := time.Now()

    newBlock.Index = oldBlock.Index + 1
    newBlock.Timestamp = t.String()
    newBlock.BPM = BPM
    newBlock.PrevHash = oldBlock.Hash
    newBlock.Hash = calculateHash(newBlock)

    return newBlock
}
  • isBlockValid檢查Block的hash是否合法
  • calculateHash使用sha256來對原始資料做hash
  • generateBlock建立一個新的Block區塊,然後新增到區塊鏈Blockchain上,同時會包含所需的事務

P2P結構

下面我們快接近核心部分了,首先我們要寫出建立主機的邏輯。當一個節點執行我們的程式時,它可以作為一個主機,被其它節點連線。下面一起看看程式碼:-)

// makeBasicHost creates a LibP2P host with a random peer ID listening on the
// given multiaddress. It will use secio if secio is true.
func makeBasicHost(listenPort int, secio bool, randseed int64) (host.Host, error) {

    // If the seed is zero, use real cryptographic randomness. Otherwise, use a
    // deterministic randomness source to make generated keys stay the same
    // across multiple runs
    var r io.Reader
    if randseed == 0 {
        r = rand.Reader
    } else {
        r = mrand.New(mrand.NewSource(randseed))
    }

    // Generate a key pair for this host. We will use it
    // to obtain a valid host ID.
    priv, _, err := crypto.GenerateKeyPairWithReader(crypto.RSA, 2048, r)
    if err != nil {
        return nil, err
    }

    opts := []libp2p.Option{
        libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", listenPort)),
        libp2p.Identity(priv),
    }

    if !secio {
        opts = append(opts, libp2p.NoEncryption())
    }

    basicHost, err := libp2p.New(context.Background(), opts...)
    if err != nil {
        return nil, err
    }

    // Build host multiaddress
    hostAddr, _ := ma.NewMultiaddr(fmt.Sprintf("/ipfs/%s", basicHost.ID().Pretty()))

    // Now we can build a full multiaddress to reach this host
    // by encapsulating both addresses:
    addr := basicHost.Addrs()[0]
    fullAddr := addr.Encapsulate(hostAddr)
    log.Printf("I am %s\n", fullAddr)
    if secio {
        log.Printf("Now run \"go run main.go -l %d -d %s -secio\" on a different terminal\n", listenPort+1, fullAddr)
    } else {
        log.Printf("Now run \"go run main.go -l %d -d %s\" on a different terminal\n", listenPort+1, fullAddr)
    }

    return basicHost, nil
}

makeBasicHost函式有3個引數,同時返回一個host結構體
- listenPort是主機監聽的埠,其它節點會連線該埠
- secio表明是否開啟資料流的安全選項,最好開啟,因此它代表了”安全輸入/輸出”
- randSeed是一個可選的命令列標識,可以允許我們提供一個隨機數種子來為我們的主機生成隨機的地址。這裡我們不會使用

函式的第一個if語句針對隨機種子生成隨機key,接著我們生成公鑰和私鑰,這樣能保證主機是安全的。opts部分開始構建網路地址部分,這樣其它節點就可以連線進來。

!secio部分可以繞過加密,但是我們準備使用加密,因此這段程式碼不會被觸發。

接著,建立了主機地址,這樣其他節點就可以連線進來。log.Printf可以用來在控制檯打印出其它節點的連線資訊。最後我們返回生成的主機地址給呼叫方函式。

流處理

之前的主機需要能處理進入的資料流。當另外一個節點連線到主機時,它會想要提出一個新的區塊鏈,來覆蓋主機上的區塊鏈,因此我們需要邏輯來判定是否要接受新的區塊鏈。

同時,當我們往本地的區塊鏈新增區塊後,也要把相關資訊廣播給其它節點,這裡也需要實現相關邏輯。

先來建立流處理的基本框架吧:

func handleStream(s net.Stream) {

    log.Println("Got a new stream!")

    // Create a buffer stream for non blocking read and write.
    rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

    go readData(rw)
    go writeData(rw)

    // stream 's' will stay open until you close it (or the other side closes it).
}

這裡建立一個新的ReadWriter,為了能支援資料讀取和寫入,同時我們啟動了一個單獨的Go協程來處理相關讀寫邏輯。

讀取資料

首先建立readData函式:

func readData(rw *bufio.ReadWriter) {

    for {
        str, err := rw.ReadString('\n')
        if err != nil {
            log.Fatal(err)
        }

        if str == "" {
            return
        }
        if str != "\n" {

            chain := make([]Block, 0)
            if err := json.Unmarshal([]byte(str), &chain); err != nil {
                log.Fatal(err)
            }

            mutex.Lock()
            if len(chain) > len(Blockchain) {
                Blockchain = chain
                bytes, err := json.MarshalIndent(Blockchain, "", "  ")
                if err != nil {

                    log.Fatal(err)
                }
                // Green console color:     \x1b[32m
                // Reset console color:     \x1b[0m
                fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
            }
            mutex.Unlock()
        }
    }
}

該函式是一個無限迴圈,因為它需要永不停歇的去讀取外面進來的資料。首先,我們使用ReadString解析從其它節點發送過來的新的區塊鏈(JSON字串)。

然後檢查進來的區塊鏈的長度是否比我們本地的要長,如果進來的鏈更長,那麼我們就接受新的鏈為最新的網路狀態(最新的區塊鏈)。

同時,把最新的區塊鏈在控制檯使用一種特殊的顏色打印出來,這樣我們就知道有新連結受了。

如果在我們主機的本地添加了新的區塊到區塊鏈上,那就需要把本地最新的區塊鏈廣播給其它相連的節點知道,這樣這些節點機會接受並更新到我們的區塊鏈版本。這裡使用writeData函式:

func writeData(rw *bufio.ReadWriter) {

    go func() {
        for {
            time.Sleep(5 * time.Second)
            mutex.Lock()
            bytes, err := json.Marshal(Blockchain)
            if err != nil {
                log.Println(err)
            }
            mutex.Unlock()

            mutex.Lock()
            rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
            rw.Flush()
            mutex.Unlock()

        }
    }()

    stdReader := bufio.NewReader(os.Stdin)

    for {
        fmt.Print("> ")
        sendData, err := stdReader.ReadString('\n')
        if err != nil {
            log.Fatal(err)
        }

        sendData = strings.Replace(sendData, "\n", "", -1)
        bpm, err := strconv.Atoi(sendData)
        if err != nil {
            log.Fatal(err)
        }
        newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)

        if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
            mutex.Lock()
            Blockchain = append(Blockchain, newBlock)
            mutex.Unlock()
        }

        bytes, err := json.Marshal(Blockchain)
        if err != nil {
            log.Println(err)
        }

        spew.Dump(Blockchain)

        mutex.Lock()
        rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
        rw.Flush()
        mutex.Unlock()
    }

}

首先是一個單獨協程中的函式,每5秒鐘會將我們的最新的區塊鏈狀態廣播給其它相連的節點。它們收到後,如果發現我們的區塊鏈比它們的要短,就會直接把我們傳送的區塊鏈資訊丟棄,繼續使用它們的區塊鏈,反之則使用我們的區塊鏈。總之,無論哪種方法,所有的節點都會定期的同步本地的區塊鏈到最新狀態。

這裡我們需要一個方法來建立一個新的Block區塊,包含之前提到過的脈搏速率(BPM)。為了簡化實現,我們不會真的去通過物聯網裝置讀取脈搏,而是直接在終端控制檯上輸入一個脈搏速率數字。

首先要驗證輸入的BPM是一個整數型別,然後使用之前的generateBlock來生成區塊,接著使用spew.Dump輸入到終端控制檯,最後我們使用rw.WriteString把最新的區塊鏈廣播給相連的其它節點。

牛逼了我的哥,現在我們完成了區塊鏈相關的函式以及大多數P2P相關的函式。在前面,我們建立了流處理,因此可以讀取和寫入最新的區塊鏈狀態;建立了狀態同步函式,這樣節點之間可以互相同步最新狀態。

剩下的就是實現我們的main函數了:

func main() {
    t := time.Now()
    genesisBlock := Block{}
    genesisBlock = Block{0, t.String(), 0, calculateHash(genesisBlock), ""}

    Blockchain = append(Blockchain, genesisBlock)

    // LibP2P code uses golog to log messages. They log with different
    // string IDs (i.e. "swarm"). We can control the verbosity level for
    // all loggers with:
    golog.SetAllLoggers(gologging.INFO) // Change to DEBUG for extra info

    // Parse options from the command line
    listenF := flag.Int("l", 0, "wait for incoming connections")
    target := flag.String("d", "", "target peer to dial")
    secio := flag.Bool("secio", false, "enable secio")
    seed := flag.Int64("seed", 0, "set random seed for id generation")
    flag.Parse()

    if *listenF == 0 {
        log.Fatal("Please provide a port to bind on with -l")
    }

    // Make a host that listens on the given multiaddress
    ha, err := makeBasicHost(*listenF, *secio, *seed)
    if err != nil {
        log.Fatal(err)
    }

    if *target == "" {
        log.Println("listening for connections")
        // Set a stream handler on host A. /p2p/1.0.0 is
        // a user-defined protocol name.
        ha.SetStreamHandler("/p2p/1.0.0", handleStream)

        select {} // hang forever
        /**** This is where the listener code ends ****/
    } else {
        ha.SetStreamHandler("/p2p/1.0.0", handleStream)

        // The following code extracts target's peer ID from the
        // given multiaddress
        ipfsaddr, err := ma.NewMultiaddr(*target)
        if err != nil {
            log.Fatalln(err)
        }

        pid, err := ipfsaddr.ValueForProtocol(ma.P_IPFS)
        if err != nil {
            log.Fatalln(err)
        }

        peerid, err := peer.IDB58Decode(pid)
        if err != nil {
            log.Fatalln(err)
        }

        // Decapsulate the /ipfs/<peerID> part from the target
        // /ip4/<a.b.c.d>/ipfs/<peer> becomes /ip4/<a.b.c.d>
        targetPeerAddr, _ := ma.NewMultiaddr(
            fmt.Sprintf("/ipfs/%s", peer.IDB58Encode(peerid)))
        targetAddr := ipfsaddr.Decapsulate(targetPeerAddr)

        // We have a peer ID and a targetAddr so we add it to the peerstore
        // so LibP2P knows how to contact it
        ha.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)

        log.Println("opening stream")
        // make a new stream from host B to host A
        // it should be handled on host A by the handler we set above because
        // we use the same /p2p/1.0.0 protocol
        s, err := ha.NewStream(context.Background(), peerid, "/p2p/1.0.0")
        if err != nil {
            log.Fatalln(err)
        }
        // Create a buffered stream so that read and writes are non blocking.
        rw := bufio.NewReadWriter(bufio.NewReader(s), bufio.NewWriter(s))

        // Create a thread to read and write data.
        go writeData(rw)
        go readData(rw)

        select {} // hang forever

    }
}

首先是建立一個創世區塊(如果你讀了200行Go程式碼實現你的區塊鏈,這裡就不會陌生)。

其次我們使用go-libp2pSetAllLoggers日誌函式來記錄日誌。

接著,設定了所有的命令列標識:

  • secio之前有提到,是用來加密資料流的。在我們的程式中,一定要開啟該標識
  • target指明當前節點要連線到的主機地址
  • listenF是當前節點的監聽主機地址,這樣其它節點就可以連線進來,記住,每個節點都有兩個身份:主機和客戶端, 畢竟P2P不是白叫的
  • seed是隨機數種子,用來建立主機地址時使用

然後,使用makeBasicHost函式來建立一個新的主機地址,如果我們只想做主機不想做客戶端(連線其它的主機),就使用if *target == “”

接下來的幾行,會從target解析出我們要連線到的主機地址。然後把peerID和主機目標地址targetAddr新增到”store”中,這樣就可以持續跟蹤我們跟其它主機的連線資訊,這裡使用的是ha.Peerstore().AddAddr函式。

接著我們使用ha.NewStream連線到想要連線的節點上,同時為了能接收和傳送最新的區塊鏈資訊,建立了ReadWriter,同時使用一個Go協程來進行readDatawriteData

哇哦

終於完成了,寫文章遠比寫程式碼累!我知道之前的內容有點難,但是相比P2P的複雜性來說,你能通過一個庫來完成P2P網路,已經很牛逼了,所以繼續加油!

完整程式碼

執行結果

現在讓我們來試驗一下,首先開啟3個獨立的終端視窗做為獨立節點。

開始之前,請再次進入go-libp2p的根目錄執行一下make deps,確保所有依賴都正常安裝。

回到你的工作目錄examples/p2p,開啟第一個終端視窗,輸入
go run main.go -l 10000 -secio
終端1

細心的讀者會發現有一段話”Now run…”,那還等啥,繼續跟著做吧,開啟第二個終端視窗執行:go run main.go -l 10001 -d <given address in the instructions> -secio
終端2

這是你會發現第一個終端視窗檢測到了新連線!終端1

接著開啟第三個終端視窗,執行:go run main.go -l 10002 -d <given address in the instructions> -secio

終端3

檢查第二終端,又發現了新連線
終端2

接著,該我們輸入BPM資料了,在第一個終端視窗中輸入”70”,等幾秒中,觀察各個視窗的列印輸出。
終端1

終端2

終端3

來看看發生了什麼:
- 終端1向本地的區塊鏈添加了一個新的區塊Block
- 終端1向終端2廣播該資訊
- 終端2將新的區塊鏈跟本地的對比,發現終端1的更長,因此使用新的區塊鏈替代了本地的區塊鏈,然後將新的區塊鏈廣播給終端3
- 同上,終端3也進行更新

所有的3個終端節點都把區塊鏈更新到了最新版本,同時沒有使用任何外部的中心化服務,這就是P2P網路的力量!

我們再往終端2的區塊鏈中新增一個區塊試試看,在終端2中輸入”80”

終端2

終端1

終端3

結果忠誠的記錄了我們的正確性,再一次歡呼吧!

下一步

先享受一下自己的工作,你剛用了區區幾百行程式碼就實現了一個全功能的P2P網路!這不是開玩笑,P2P程式設計時非常複雜的,為什麼之前沒有相關的教程,就是因為太難了。

但是,這裡也有幾個可以改進的地方,你可以挑戰一下自己:

  • 之前提到過,go-libp2p是存在資料競爭的Bug的,因此如果你要在生產環境使用,需要格外小心。一旦發現Bug,請反饋給作者團隊知道
  • 嘗試將本文的P2P網路跟之前的共識協議結合,例如之前的文章PoWPoS (PoS是中文譯文)
  • 新增持久化儲存。截止目前,為了簡化實現,我們沒有實現持久化儲存,因此節點關閉,資料就丟失了
  • 學習一下節點發現技術。新節點是怎麼發現已經存在的節點的?這篇文章是一個很好的起點

如果我寫的任何文章曾在你的心裡蕩起漣漪,那至少說明在逝去的歲月裡,我們在某一刻,共同經歷著一樣的技術探索之路。
有時候,雖然素未謀面,卻已相識很久,很微妙也很知足。

想學習區塊鏈技術,可以搜尋公眾號優優區塊鏈課堂或者新增公眾微訊號uulesson
優優區塊鏈課堂