golang分散式儲存 讀書筆記(2)——流操作之PutStream封裝
接著上一篇的 golang分散式儲存 讀書筆記(1)——流操作之GetStream封裝 ,這次要講的是上傳檔案並儲存,使用 restful
的 PUT
方法,書中封裝了 PutStream
結構。
介面設計
客戶端上傳資料時向介面伺服器傳送 PUT
請求,請求的 url
為 /objects/<object_name>
。
同樣介面伺服器向資料伺服器轉發 PUT
請求,請求的 url
為 /objects/<object_name>
。資料伺服器在本地指定資料夾( D:/objects/
)下建立 <object_name>
檔案,將 PUT
的內容寫入檔案中。
目錄結構
在 GOPATH/src
目錄下,目錄結構為:
go-storage apiServer objects get.go put.go apiServer.go dataServer dataServer.go
資料伺服器實現
dataServer程式碼
資料伺服器的 put
介面和 get
介面很類似,只不過將讀檔案改為了寫檔案。
package main import ( "net/http" "io" "os" "log" "strings" ) const ( objectDir = "D:/objects/" ) func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method log.Println(m) if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { // 提取檔名 fname := strings.Split(r.URL.EscapedPath(), "/")[2] log.Println(fname) // 建立檔案 f, e := os.Create(objectDir + fname) if e != nil { log.Println(e) w.WriteHeader(http.StatusInternalServerError) return } defer f.Close() // 往檔案寫入資料 io.Copy(f, r.Body) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8889", nil) }
io.Copy(f, r.Body)
將 r.body
的資料寫入 f
中,其中 r.Body
實現了 io.ReadCloser
介面, f
實現了 io.Writer
介面。最後要記得關閉檔案。
測試
使用 Restlet Client
傳送 PUT
請求進行測試。

測試資料伺服器.png
往 http://localhost:8889/objects/1.txt
傳送 PUT
請求,可以看到本地確實生成了 1.txt
檔案。
介面伺服器實現
版本一
實現PUT請求
可以使用 http.NewRequest
構造一個 PUT
請求,使用 http.Client
構造一個客戶端進行傳送。
例如:
request, _ := http.NewRequest("PUT","http://127.0.0.1:8889/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 傳送並接受請求
完整程式碼如下:
package main import ( "net/http" "io" "strings" "go-storage/apiServer/objects" "log" ) const dataServerAddr = "http://localhost:8889/objects/" func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() object := strings.Split(r.URL.EscapedPath(), "/")[2] request, _ := http.NewRequest("PUT",dataServerAddr + object, r.Body) client := http.Client{} resp, e := client.Do(request) // 傳送並接受請求 if e == nil && resp.StatusCode != http.StatusOK { w.WriteHeader(http.StatusNotFound) log.Printf("dataServer return http code %d", resp.StatusCode) return } defer resp.Body.Close() io.Copy(w, resp.Body) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8888", nil) }
測試

介面伺服器測試.png
往 http://localhost:8888/objects/2.txt
傳送 PUT
請求,可以看到本地確實生成了 2.txt
檔案。
版本二——封裝版本
書上的程式碼實現的相對複雜一點,將整個操作封裝成了一個 PutStream
的結構體,先看結構體的具體成員:
type PutStream struct { writer *io.PipeWriter cchan error }
可以看到其中一個成員是 io.PipeWriter
型別,這類似於 linux
中的管道,一端寫入,一讀取讀取,這裡是為了在兩個協程之間建立連線,這裡使用 channel
並不好使。
另一個成員是 channel
型別,因為子協程不能有返回值,所以這裡用通道傳遞錯誤。
PutStream
的建構函式如下:
func NewPutStream(server, object string) *PutStream { reader, writer := io.Pipe() // 通過管道將 兩個協程 聯絡起來 (用channel應該也可以把?) c := make(chan error) go func() { request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 如果 reader一直沒有資料,是不是 Do就會阻塞? if e == nil && r.StatusCode != http.StatusOK { e = fmt.Errorf("dataServer return http code %d", r.StatusCode) } c <- e }() return &PutStream{writer, c} }
先使用 io.Pipe()
構造一個 writer
和 reader
,再初始化一個通道 c
,最後使用 writer
和 c
構造一個 PutStream
指標物件返回。
中間開啟了一個子協程,該協程構造一個 http.NewRequest
,並使用 http.Client
構造客戶端傳送請求。其中 request
構造的時候使用了管道的讀端 reader
,此時該管道的讀端並沒有資料,但是 http.NewRequest
構造 request
的時候並不會阻塞,而是會阻塞在 client.Do(request)
這句程式碼,直到管道的寫端寫入資料。
如果得到的響應出錯了,將錯誤寫入管道 c
中。
同時這個 PutStream
需要實現 io.Writer
和 io.Writer
介面:
// 實現了 io.Writer介面 func (w *PutStream) Write(p []byte) (n int, err error) { return w.writer.Write(p) } // 關閉流並得到錯誤 func (w *PutStream) Close() error { w.writer.Close() // io.PipeWriter 關閉, reader也會關閉? client.Do(request)才能結束? return <-w.c }
由於功能是要上傳並儲存一個物件,所以實現一個 StoreObject
方法來呼叫 PutStream
。
func StoreObject(r io.Reader, object string) (int, error) { stream := NewPutStream(data_server, object) // 會阻塞,直到r中收到 EOF,stream實現了io.Writer介面 io.Copy(stream, r) // 將r的內容拷貝的stream 中,stream有資料的時候,他對應的reader也就有了資料 // 會阻塞到 stream中的c channel收到訊息 e := stream.Close() if e != nil { return http.StatusInternalServerError, e } return http.StatusOK, nil }
新建一個 PutStream
指標型別的 stream
,由於它實現了 io.Writer
介面,所以可以呼叫 io.Copy
將 r
中的內容複製到 stream
中,其實就是寫入到管道的寫端。最後關閉流,管道寫端寫入也就結束,讀端也讀取結束,子協程的傳送也就結束了。
其實該版本的實現和版本一是一樣的,只不過多了一個子協程,多使用了 io.Pipe
管道。
看起來其實版本一更加簡單、直接。暫時也看不出封裝的優勢,也許在後面功能越來越複雜的時候,就可以體現這個優勢。個人感覺這個封裝還是比較優雅。
完整程式碼
package objects, put.go
package objects import ( "net/http" "fmt" "io" ) type PutStream struct { writer *io.PipeWriter cchan error } const ( data_server = "127.0.0.1:8889" ) func StoreObject(r io.Reader, object string) (int, error) { stream := NewPutStream(data_server, object) // 會阻塞到 r中收到 EOFstream實現了io.Writer介面 io.Copy(stream, r) // 將r的內容拷貝的stream 中,stream有資料的時候,他對應的reader也就有了資料 // 會阻塞到 stream中的c channel收到訊息 e := stream.Close() if e != nil { return http.StatusInternalServerError, e } return http.StatusOK, nil } func NewPutStream(server, object string) *PutStream { reader, writer := io.Pipe() // 通過管道將 兩個協程 聯絡起來 (用channel應該也可以把?) c := make(chan error) go func() { request, _ := http.NewRequest("PUT", "http://"+server+"/objects/"+object, reader) client := http.Client{} r, e := client.Do(request) // 如果 reader一直沒有資料,是不是 Do就會阻塞? if e == nil && r.StatusCode != http.StatusOK { e = fmt.Errorf("dataServer return http code %d", r.StatusCode) } c <- e }() return &PutStream{writer, c} } // 實現了 io.Writer介面 func (w *PutStream) Write(p []byte) (n int, err error) { return w.writer.Write(p) } func (w *PutStream) Close() error { w.writer.Close() // io.PipeWriter 關閉, reader也會關閉? client.Do(request)才能結束? return <-w.c }
package main, apiServer.go
package main import ( "net/http" "io" "strings" "go-storage/apiServer/objects" "log" ) const dataServerAddr = "http://localhost:8889/objects/" func Handler(w http.ResponseWriter, r *http.Request) { m := r.Method if m == http.MethodGet { // get(w, r) return } else if m == http.MethodPut { put(w, r) return } w.WriteHeader(http.StatusMethodNotAllowed) } func put(w http.ResponseWriter, r *http.Request) { // object 要儲存的物件名 object := strings.Split(r.URL.EscapedPath(), "/")[2] c, e := objects.StoreObject(r.Body, object) if e != nil { log.Println(e) } w.WriteHeader(c) } func main() { http.HandleFunc("/objects/", Handler) http.ListenAndServe(":8888", nil) }
測試過程同版本一。
疑問
- 如果傳輸一個
4g
的檔案,到底需不需要佔用4g
內容?在上一篇文章中我認為這種流操作可以減小記憶體佔用,但是現在不太確定。