1. 程式人生 > >從零開始編寫一個BitTorrent下載器

從零開始編寫一個BitTorrent下載器

# 從零開始編寫一個BitTorrent下載器 ## BT協議 ### 簡介 > BT協議Bit Torrent(BT)是一種通訊協議,又是一種應用程式,廣泛用於對等網路通訊(P2P)。曾經風靡一時,由於它引起了巨大的流量,對因特網的運營、維護和管理都產生了重要的影響。 BT協議的典型特徵就是沒有中心伺服器。BT協議中,作為參與者的機器被稱為**peers**。**peer**之間的通訊協議又被稱為**peer wire protocal**,即**peer連線協議**,是一個基於TCP協議的應用層協議。 BT協議在20年裡不斷髮展(從2001年開始),加入加密、私有種子等設計,也擴充套件了搜尋peer主機的方法。 ### 連線 由於沒有中心伺服器,參與者需要使用另外的方法取得他人的地址,以建立對等連線,確定自己的機器應當從何處下載需要的檔案。傳統的BT協議使用中介伺服器**trackers**來告知每個參與者如何進行下載。**trackers**伺服器是基於HTTP的,這類伺服器本身不託管檔案資源,僅為每個參與者分配peers。 在BT協議網路中傳播違法資源的現象十分常見,這導致其中介伺服器常常會受到法律制裁,查封事件屢見不鮮。要解決這一問題,就需要將主機搜尋的工作下放到每個參與者的機器,即**分散式處理(distributed process)**。BT協議未來的核心就是[DHT](https://baike.baidu.com/item/DHT/1007999)、PEX、[磁力鏈](https://baike.baidu.com/item/磁力連結)。 ## \.torrent檔案解析 以debian釋出的映象檔案種子為例。 ![image]() 一個\.torrent檔案描述了可下載檔案的內容以及需要連線到的tracker中介伺服器的資訊,其編碼格式為**Bencode**。 檔案的頭部資訊可以直接以文字形式檢視: ``` d8:announce41:http://bttracker.debian.org:6969/announce7:comment35:"Debian CD from cdimage.debian.org"13:creation datei1612616380e9:httpseedsl146:https://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso146:https://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.isoe4:infod6:lengthi425721856e4:name35:debian-edu-10.8.0-amd64-netinst.iso12:piece lengthi262144e6:pieces32480:[每個部分的hash,以二進位制表示] ``` 之後的內容為二進位制,無法直接檢視。 美化一下這個部分的資訊,可以發現清晰的結構特徵: ```json d 8:announce 41:http://bttracker.debian.org:6969/announce 7:comment 35:"Debian CD from cdimage.debian.org" 13:creation date i1612616380e 9:httpseeds l 146:https://cdimage.debian.org/cdimage/release/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso 146:https://cdimage.debian.org/cdimage/archive/edu//srv/cdbuilder.debian.org/dst/deb-cd/weekly-builds/amd64/iso-cd/debian-edu-10.8.0-amd64-netinst.iso e 4:info d 6:length i425721856e 4:name 35:debian-edu-10.8.0-amd64-netinst.iso 12:piece length i262144e 6:pieces 32480:[每個部分的hash,以二進位制表示] e e ``` 其中包含了tracker伺服器的URL、建立事件(Unix時間戳)、檔名和檔案大小、以及一系列表示每個檔案塊的**SHA\-1**雜湊值的二進位制片段(檔案塊是指檔案被等量拆分後形成的幾個部分)。每個種子中檔案被拆分的大小依據是不同的,但基本處在**一個區間內(256KB到1MB)**。因為這樣的設計,大型檔案將會被拆分成眾多碎片。在實際下載中,下載執行者會從能夠連線的那些peers主機下載檔案塊,並且根據種子檔案校驗其雜湊值,最後拼接成完整的檔案。 這種機制能夠確保每個檔案塊的完整性,抵禦裝置故障或惡意投毒(torrent poisoning)造成的損害。如果攻擊者不能破解SHA\-1進行[原像攻擊](https://zh.wikipedia.org/wiki/原像攻擊)(preimage attack),那麼下載取得的檔案就是安全可靠的。 ### Bencode編碼 從已知的資訊可以看出,\.torrent檔案中的元資料均以“鍵:值”形式儲存,故可以將整個內容理解為一個經過特殊編碼的字典,或者一個近似的JSON。 Bencode中,數字採用十進位制編碼,相比純二進位制編碼顯得效率較低,但保證了良好的跨平臺性(無大小端儲存問題)。 Bencode支援四種類型的資料:string、int、Dictionary\、List\。 - **string型別** string型別的編碼格式為**\[length\]:\[string\]**,以字串長度開頭,以字串內容結束。示例: ``` "abc" => 3:abc ``` - **int型別** int型別的編碼格式為**i\[int\]e**,以i開頭,以e結尾。示例: ``` 123 => i123e ``` - **Dictionary\型別** Dictionary\型別的編碼格式為**d\[Key\-Value Pair\]e**,以d開頭,以e結尾。示例: ``` Dictionary<{"name":"create chen"},{"age":23}> => d4:name11:create chen3:agei23ee ``` - **List\型別** List\型別的編碼格式為**l\[object\]e**,以l開頭,以e結尾。示例: ``` List<"abc", 123> => l3:abci123ee ``` ### Bencode實現 #### 編碼 ```java public static string Encode(object obj) { var sb = new StringBuilder(); if(obj is Dictionary) { var parseObj = obj as Dictionary; sb.Append("d"); foreach (var o in parseObj) { sb.AppendFormat("{0}:{1}{2}", o.Key.Length,o.Key, Encode(o.Value)); } sb.Append("e"); } if ((obj as int?) != null) { var parseObj = (int) obj; sb.AppendFormat("i{0}e", parseObj); } if (obj is List) { var parseObj = obj as List; sb.Append("l"); foreach (var o in parseObj) { sb.Append(Encode(o)); } sb.Append("e"); } if (obj is string) { var parseObj = obj as string; sb.AppendFormat("{0}:{1}", parseObj.Length, parseObj); } return sb.ToString(); } ``` #### 解碼 ```java public static object Decode(string s) { return DecodeObject(s, ref _index, EncodeState.Value); } private enum EncodeState { Key, Value } private static int _index; private static object DecodeObject(string str,ref int index, EncodeState state) { var obj = new Dictionary(); var c = str[index]; while (c != 'e') { if (c == 'd') { index++; return DecodeObject(str, ref index,EncodeState.Key); } if (c == 'i') { var value = ""; index++; c = str[index]; while (c != 'e') { value += c.ToString(CultureInfo.InvariantCulture); index++; c = str[index]; } return Convert.ToInt32(value); } if (c == 'l') { index++; var value = new List(); while (str[index]!='e') { value.Add(DecodeObject(str, ref index, EncodeState.Value)); index++; } return value; } if ('0' < c && c <= '9') { string strLength = ""; while (c != ':') { strLength += c.ToString(CultureInfo.InvariantCulture); c = str[++index]; } var length = Convert.ToInt32(strLength); var strContent = ""; for (int i = 0; i < length; i++) { strContent += str[index + 1].ToString(CultureInfo.InvariantCulture); index++; } if (state == EncodeState.Value) { return strContent; } index++; obj.Add(strContent, DecodeObject(str, ref index, EncodeState.Value)); state = EncodeState.Key; index++; } c = str[index]; } return obj; } ``` ## 編寫專案 這裡使用Go來編寫,也是首次使用Go完成網路工具。僅包含主要程式碼,完整專案見[Github](https://github.com/4thrun/Tiny-BT-Client)。 ### 尋找 #### 解析種子(\~/torrentfile/torrentfile\.go) ```go import ( "github.com/jackpal/bencode-go" ) ``` 這裡省略了自帶庫檔案的匯入。 ```go type bencodeInfo struct { Pieces string `bencode:"pieces"` PieceLength int `bencode:"piece length"` Length int `bencode:"length"` Name string `bencode:"name"` } type bencodeTorrent struct { Announce string `bencode:"announce"` Info bencodeInfo `bencode:"info"` } ``` ```go // Open函式用於解析種子 func Open(path string) (TorrentFile, error) { file, err := os.Open(path) if err != nil { return TorrentFile{}, err } defer file.Close() bto := bencodeTorrent{} err = bencode.Unmarshal(file, &bto) if err != nil { return TorrentFile{}, err } return bto.toTorrentFile() } ``` 處理時,將**pieces**對應的值(原先為雜湊值的字串)變成雜湊值切片(每個長度為20 bytes),以便後續呼叫每個獨立的雜湊值。另外,計算**info**對應的整個字典(含有名稱、大小、檔案塊雜湊值)的SHA\-1雜湊值,儲存在**infohash**,在與trackers伺服器和peers主機互動時表示所需的檔案。 ```go type TorrentFile struct { Announce string InfoHash [20]byte PieceHashes [][20]byte PieceLength int Length int Name string } ``` ```go func (bto bencodeTorrent) toTorrentFile() (TorrentFile, error) { // ... } ``` #### 從trackers伺服器獲取peers主機地址(\~/torrentfile/tracker\.go) 處理完種子後,就可以向trackers伺服器發起請求:作為一臺peer主機,需要獲取同一網路中的其它peers主機的列表。只需要對**announce**對應URL發起GET請求(需要設定幾個請求引數)。 ```go // buildTrackerURL函式用於構成請求peers列表的序列 func (t * TorrentFile) buildTrackerURL(peerID [20]byte, port uint16) (string, error) { base, err:= url.Parse(t.Announce) if err != nil { return "", err } params := url.Values{ "info_hash": []string{string(t.InfoHash[:])}, "peer_id": []string{string(peerID[:])}, "port": []string{strconv.Itoa(int(port))}, "uploaded": []string{"0"}, "downloaded": []string{"0"}, "compact": []string{"1"}, "left": []string{strconv.Itoa(t.Length)}, } base.RawQuery = params.Encode() return base.String(), nil } ``` 其中重要的引數有: - **info\_hash**:用以標識需要下載的檔案,其值就是之前由**info**對應值計算出的**infohash**。trackers伺服器基於這個值返回能夠為下載提供資源的peers主機。 - **peer\_id**:20位元組長的資料,用於向peers主機和trackers伺服器標識自己的身份。具體實現僅僅是產生隨機的20個位元組。真實的BitTorrent客戶端ID形如`-TR2940-k8hj0wgej6ch`,標出了客戶端軟體及其版本(`TR2940`表示Transmission Client 2.94)。 #### 處理trackers伺服器的響應(\~/peers/peers\.go) 伺服器響應也是採用Bencode編碼的: ```json d 8:interval i900e 5:peers 252:[很長的二進位制塊] e ``` `interval`表示本地應當在多長的時間間隔後再次向tracker伺服器請求以重新整理peers主機列表,900的單位是秒。`peers`包含了每個peer主機的IP地址,以二進位制表示,由若干個**6位元組元**組成,前4個位元組表示主機IP,後2個位元組表示埠號(大端儲存的16位無符號整型,uint16)。大端儲存,即**big\-endian**,是網路中所採用的儲存方式(相對於小端儲存),故被稱為**network order**。運算時可以直接將一組位元組從左至右拼接以形成所要表達的整數,如`0x1A`和`0xE1`能拼接成`0x1AE1`,即十進位制的6881。 ```go type Peer struct { IP net.IP Port uint16 } ``` ```go // Unmarshal函式從緩衝區解析IP及其埠 func Unmarshal(peerBin []byte)([]Peer, error) { const peerSize = 6 numPeers := len(peerBin) / peerSize if len(peerBin) % peerSize != 0 { err := fmt.Errorf("received malformed peers") return nil, err } peers := make([]Peer, numPeers) for i := 0; i < numPeers ; i++ { offset := i * peerSize peers[i].IP = net.IP(peerBin[offset : offset+4]) peers[i].Port = binary.BigEndian.Uint16(peersBin[offset+4 : offset+6]) } return peers, nil } ``` ### 下載 在取得peers主機的地址後,就可以進行下載了。對每臺peer主機的連線,有如下的幾個步驟: 1. 與目標peer建立TCP連線; 2. 完成BitTorrent握手; 3. 交換資訊(告知對方本地需要的資源)。 #### TCP連線(\~/client/client\.go) 設定一個超時檢測機制,防止消耗過多網路資源。 ```go conn, err := net.DialTimeout("tcp", peer.String(), 3*time.Second) if err != nil { return nil, err } ``` #### 握手(\~/handshake/handshake\.go) 通過達成握手,以確定某peer主機具有期望的功能: - 能夠使用BT協議通訊; - 能夠理解本機發出的資訊,並作出響應; - 持有本機需要的檔案資源,或者持有檔案資源在網路中位置的索引。 BitTorrent握手行為需要傳輸的資訊由5個部分構成: 1. 協議標識(表明這是BitTorrent協議)的長度,即19,十六進位制表示為`0x13`; 2. 協議標識,被稱為**pstr**,即`BitTorrent protocol`; 3. 8個保留位元組,預設全為0,如果客戶端支援BT協議的某些擴充套件,則需要將其中一些設定為1; 4. **infohash**,基於種子中**info**對應的全部資訊計算得出的雜湊值,用於標明本機需要的檔案; 5. **PEER ID**,用於標明本機身份。 這些資訊組合起來,就是達成握手需要的序列: ``` \x13BitTorrent protocol\x00\x00\x00\x00\x00\x00\x00\x00\x86\xd4\xc8\x00\x24\xa4\x69\xbe\x4c\x50\xbc\x5a\x10\x2c\xf7\x17\x80\x31\x00\x74-TR2940-k8hj0wgej6ch ``` 本機發出這些資訊後,peers主機應當以相同形式響應,且返回的**infohash**應當與本機持有的一致。 使用一個結構體表示握手包,並新增一些序列化、讀取函式。 ```go // 握手包結構體 type Handshake struct { Pstr string InfoHash [20]byte PeerID [20]byte } ``` ```go //Serialize函式用於序列化握手資訊 func (h *Handshake) Serialize() []byte { buf := make([]byte, len(h.Pstr)+49) buf[0] = byte(len(h.Pstr)) curr := 1 curr += copy(buf[curr:], h.Pstr) curr += copy(buf[curr:], make([]byte, 8)) //即8個保留位元組 curr += copy(buf[curr:], h.InfoHash[:]) curr += copy(buf[curr:], h.PeerID[:]) return buf } func Read(r io.Reader) (* Handshake, error) { // ... } ``` #### 資訊 完成握手後就將開始正式的收發資訊。如果遠端的peers主機未能做好收發的準備,本機仍舊無法傳送資訊,此時本機會被遠端認定為**阻塞的(choked)**。在peers主機完成準備後,會向本機發送**解除阻塞(unchoke)**資訊。程式碼設計中,預設需要傑出阻塞才能進行下載。 ##### 解析(\~/message/message\.go) 資訊包含三個部分:長度、ID、payload。 長度為32位整型,是大端儲存形式的4個位元組。ID用以表示資訊型別,這在程式碼中進行了詳細定義。 ```go type messageID uint8 const ( // MsgChoke表示阻塞 MsgChoke messageID = 0 // MsgUnchoke表示解除阻塞 MsgUnchoke messageID = 1 // MsgInterested表示資訊相關 MsgInterested messageID = 2 // MsgNotInterested表示資訊不相關 MsgNotInterested messageID = 3 // MsgHave表示提醒接收者,傳送者擁有資源 MsgHave messageID = 4 // MsgBitfield表示傳送者擁有資源的哪些部分 MsgBitfield messageID = 5 // MsgRequest表示向接收方請求資料 MsgRequest messageID = 6 // MsgPiece表示傳送資料以完成請求 MsgPiece messageID = 7 // MsgCancel表示取消一個請求 MsgCancel messageID = 8 ) //Message結構體儲存ID和包含資訊的payload type Message struct { ID messageID Payload []byte } ``` ```go // Serialize函式用於執行序列化 // 資訊依次為字首、資訊的ID、payload // 需要將`nil`解釋為`keep-alive` func (m *Message) Serialize() []byte { if m == nil { return make([]byte, 4) } length := uint32(len(m.Payload) + 1) buf := make([]byte, 4+length) binary.BigEndian.PutUint32(buf[0:4], length) buf[4] = byte(m.ID) copy(buf[5:], m.Payload) return buf } ``` 為讀取資訊,也需要依照資訊格式編寫函式。先讀取4個位元組並作為一個`uint32`以表示長度**length**,然後依據這個數字讀取相應位數的資料,這部分中的第一個位元組表示**ID**,剩下的表示**payload**。 ```go // Read函式用於解析資訊 func Read(r io.Reader) (*Message, error) { lengthBuf := make([]byte, 4) _, err := io.ReadFull(r, lengthBuf) if err != nil { return nil, err } length := binary.BigEndian.Uint32(lengthBuf) // keep-alive if length == 0 { return nil, nil } messageBuf := make([]byte, length) _, err = io.ReadFull(r, messageBuf) if err != nil { return nil, err } m := Message{ ID: messageID(messageBuf[0]), Payload: messageBuf[1:], } return &m, nil } ``` ##### 位域(\~/bitfield/bitfield\.go) peers主機使用位域來高效地編碼自身能夠提供的資源分塊。位域類似基於位元組的陣列,被標為1的位即代表擁有這個資源分塊。因為使用單個的位即能完成標註,位域有極高的壓縮能力,這意味著在一個布林(`bool`)空間內完成了8次布林型別的操作。 當然這樣的思路需要一定的代價:可以定址的最小記憶體單位是位元組,處理單個的位就需要額外的函式設計。 ```go // Bitfield用以表示一臺peer主機擁有的資源分塊 type Bitfield []byte ``` ```go // HasPiece用以表明一個位域(bitfield)是否有特定的索引集 func (bf Bitfield) HasPiece(index int) bool { byteIndex := index / 8 offset := index % 8 if byteIndex < 0 || byteIndex >= len(bf) { return false } return bf[byteIndex] >> uint(7-offset)&1 != 0 } // SetPiece用以在位域設定單個位 func (bf Bitfield) SetPiece(index int) { byteIndex := index / 8 offset := index % 8 // 撇除不合規的索引 if byteIndex < 0 || byteIndex >= len(bf) { return } bf[byteIndex] |= 1 << uint(7-offset) } ``` ### 組裝 至此完成了所有下載種子檔案的工具: - 從trackers伺服器獲得了peers主機列表; - 與peers主機達成TCP連線; - 與peers主機進行握手; - 與peers主機收發資訊。 現在面臨的問題是如何解決下載必然造成的**高併發(concurrency)**,並且需要統一管理每個連線的peer主機的**狀態(state)**。 #### 高併發(\~/p2p/p2p\.go) 在*Effective Go*中對併發的描述中有這樣一句話: >
Do not communicate by sharing memory; instead, share memory by communicating. [官網](https://golang.org/ref/mem)給出瞭解釋。 這裡將Go中重要的Channel型別作為簡潔且執行緒安全的佇列。Channel可以被認為是管道,通過併發核心單元就可以傳送或者接收資料進行通訊(communication)。 建立兩個Channel來同步併發工作:一個用於在peers主機間分派工作(要下載的資源分塊),另一個用於已下載的分塊。 ```go workQueue := make(chan *pieceWork, len(t.PieceHashes)) results := make(chan *pieceResult) for index, hash := range t.PieceHashes { length := t.calculatePieceSize(index) workQueue <- &pieceWork{index, hash, length} } // 執行下載 for _, peer := range t.Peers { go t.startDownloadWorker(peer, workQueue, results) } // 收集分塊 buf := make([]byte, t.Length) donePieces := 0 for donePieces < len(t.PieceHashes) { res := <- results begin, end := t.calculateBoundsForPiece(res.index) copy(buf[begin:end], res.buf) donePieces ++ percent := float64(donePieces) / float64(len(t.PieceHashes)) * 100 numWorkers := runtime.NumGoroutine() - 1 log.Printf("(%0.2f%%) downloaded piece #%d from %d peers\n", percent, res.index, numWorkers) } close(workQueue) ``` 為取得的每個peer主機都生成一個**goroutine(輕量級執行緒)**。每個執行緒連線peer主機並握手,然後從`workQueue`中抽取任務,嘗試進行下載,並把下載得到的分塊傳至名為`results`的**channel**。 可以用流程圖表示這個過程: ![image](