1. 程式人生 > >從零開始搭建 gRPC 服務 - Golang 篇(二)

從零開始搭建 gRPC 服務 - Golang 篇(二)

從零開始搭建 gRPC 服務 - Golang 篇(一)中介紹瞭如何搭建 gRPC 環境並構建一個簡單的 gRPC 服務,本文將介紹 gRPCstreaming

流式 RPC

gRPC 基於標準的 HTTP/2 進行傳輸,可以方便的實現 streaming 功能。要在 gRPC 中使用 streaming,只需要在 proto 中在請求或響應前加上 stream 即可。

服務端流式 RPC:Server-side streaming RPC

客戶端向伺服器傳送請求並獲取流以讀取訊息序列;客戶端從返回的流中讀取,直到沒有更多訊息; gRPC 保證單個 RPC 呼叫中的訊息排序。

從零開始搭建 gRPC 服務 - Golang 篇(一)中的 helloworld.proto 中增加介面 LotsOfReplies

syntax = "proto3";

option go_package = "github.com/grpc/example/helloworld";

package helloworld;

// The greeting service definition.
service Greeter {
  rpc LotsOfReplies (HelloRequest) returns (stream HelloReply){}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

編譯 .proto 檔案

$ protoc helloworld.proto --go_out=output
$ tree .
.
├── helloworld.proto
└── output
    └── github.com
        └── grpc
            └── example
                └── helloworld
                    └── helloworld.pb.go

5 directories, 2 files

此時生成的程式碼就已經包含了流的處理,在使用上需要注意:伺服器端程式碼的實現要通過流的方式傳送響應。

編寫 server.go

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
	"google.golang.org/grpc/reflection"
)

const (
	port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

func (s *server) LotsOfReplies(in *pb.HelloRequest, stream pb.Greeter_LotsOfRepliesServer) error {
	for idx := 0; idx < 10; idx ++ {
		stream.Send(&pb.HelloReply{Message: fmt.Sprintf("Hello %s %d", in.Name, idx)})
	}
	return nil
}


func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	// Register reflection service on gRPC server.
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

如上程式碼所示,服務端在接收到請求後通過 stream 返回了 10 個響應。

編寫 client.go

package main

import (
	"context"
	"io"
	"log"
	"os"
	"time"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := c.LotsOfReplies(ctx, &pb.HelloRequest{Name: name})
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	for {
		reply, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			log.Fatalf("%v.LotsOfReplies() = _, %v", c, err)
		}
		log.Printf("Greeting: %s\n", reply.Message)
	}
}

客戶端從 stream 中讀取到若干響應,直到讀到 EOF 結束。

執行 gRPC 服務

開啟兩個會話視窗,在其中之一執行:

$ go run server.go

在另一個會話視窗執行:

 $ go run client.go gRPC_stream
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 0
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 1
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 2
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 3
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 4
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 5
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 6
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 7
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 8
2018/12/23 21:31:38 Greeting: Hello gRPC_stream 9

客戶端流式 RPC:Client-side streaming RPC

客戶端再次使用提供的流寫入一系列訊息並將其傳送到伺服器;一旦客戶端寫完訊息,它就等待伺服器讀取它們並返回它的響應; gRPC 保證在單個 RPC 呼叫中的訊息排序。

改寫 helloworld.proto ,增加 LotsOfGreetings

syntax = "proto3";

option go_package = "github.com/grpc/example/helloworld";

package helloworld;

// The greeting service definition.
service Greeter {
  rpc LotsOfGreetings (stream HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
  int32 index = 2;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

編寫 server.go

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
	"google.golang.org/grpc/reflection"
)

const (
	port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

func (s *server) LotsOfGreetings(stream pb.Greeter_LotsOfGreetingsServer) error {
	var total int32
	var name string
	for {
		greeting, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.HelloReply{
				Message: fmt.Sprintf("Hello %s, total %d", name, total),
			})
		}
		if err != nil {
			return err
		}
		name = greeting.Name
		total += greeting.Index
	}
	return nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	// Register reflection service on gRPC server.
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

服務端通過 stream 接收到若干請求,直到讀到 EOF 後再返回響應。

編寫 client.go

package main

import (
	"context"
	"log"
	"os"
	"time"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
)

const (
	address     = "localhost:50051"
	defaultName = "world"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	// Contact the server and print out its response.
	name := defaultName
	if len(os.Args) > 1 {
		name = os.Args[1]
	}
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	defer cancel()
	stream, err := c.LotsOfGreetings(ctx)
	if err != nil {
		log.Fatalf("could not greet: %v", err)
	}
	for idx := 0; idx < 10; idx ++ {
		if err := stream.Send(&pb.HelloRequest{
			Name: name,
			Index: int32(idx),
		}); err != nil {
			log.Fatalf("send err: %v", err)
		}
	}
	reply, err := stream.CloseAndRecv()
	if err != nil {
		log.Fatalf("%v.CloseAndRecv() got error %v, want %v", stream, err, nil)
	}
	log.Printf("Greeting: %s\n", reply.Message)
}

客戶端通過 stream 發起 10 個請求,然後關閉 stream 並接收響應。

執行 gRPC 服務

開啟兩個會話視窗,在其中之一執行:

$ go run server.go

在另一個會話視窗執行:

 $ go run client.go gRPC_stream
2018/12/23 22:06:43 Greeting: Hello gRPC_stream, total 45

雙向流式 RPC:Bidirectional streaming RPC

雙方使用讀寫流傳送一系列訊息,這兩個流獨立執行,因此客戶端和伺服器可以按照他們喜歡的順序進行讀寫:例如伺服器可以在寫入響應之前等待接收所有客戶端訊息,或者它可以交替讀取訊息然後寫入訊息,或其他一些讀寫組合;gRPC 保證在單個 RPC 呼叫中的訊息排序。

改寫 helloworld.proto ,增加 BidiHello

syntax = "proto3";

option go_package = "github.com/grpc/example/helloworld";

package helloworld;

// The greeting service definition.
service Greeter {
  rpc BidiHello(stream HelloRequest) returns (stream HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
  int32 index = 2;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

編寫 server.go

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"net"
	"strings"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
	"google.golang.org/grpc/reflection"
)

const (
	port = ":50051"
)

// server is used to implement helloworld.GreeterServer.
type server struct{}

func (s *server) BidiHello(stream pb.Greeter_BidiHelloServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		message := strings.Replace(in.Name, "嗎", "", -1)
		message = strings.Replace(message, "?", "!", -1)
		err = stream.Send(&pb.HelloReply{Message: message})
		if err != nil {
			return err
		}
	}
	return nil
}

func main() {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	s := grpc.NewServer()
	pb.RegisterGreeterServer(s, &server{})
	// Register reflection service on gRPC server.
	reflection.Register(s)
	if err := s.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

服務端從 stream 中讀取到請求後立即返回。

編寫 client.go

package main

import (
	"context"
	"fmt"
	"log"
	"io"
	"time"

	"google.golang.org/grpc"
	pb "./output/github.com/grpc/example/helloworld"
)

const (
	address     = "localhost:50051"
)

func main() {
	// Set up a connection to the server.
	conn, err := grpc.Dial(address, grpc.WithInsecure())
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()
	c := pb.NewGreeterClient(conn)

	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
	defer cancel()

	stream, err := c.BidiHello(ctx)
	if err != nil {
		log.Fatalf("%v.BidiHello(_) = _, %v", c, err)
	}
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				// read done.
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("Failed to receive a note : %v", err)
			}
			fmt.Printf("AI: %s\n", in.Message)
		}
	}()

	for {
		request := &pb.HelloRequest{}
		fmt.Scanln(&request.Name)
		if request.Name == "quit" {
			break
		}
		if err := stream.Send(request); err != nil {
			log.Fatalf("Failed to send a req: %v", err)
		}
	}

	stream.CloseSend()
	<-waitc
}

客戶端從標準輸出接收輸入,然後通過 stream 傳送請求,另一個 goroutine 則不斷從 stream 中接收響應。

執行 gRPC 服務

開啟兩個會話視窗,在其中之一執行:

$ go run server.go

在另一個會話視窗執行:

$ go run client.go
在嗎?
AI: 在!
你好
AI: 你好
能聽懂漢語嗎?
AI: 能聽懂漢語!
真的嗎?
AI: 真的!
quit