1. 程式人生 > >grpc-服務端與客戶端四種資料傳遞方式(2)

grpc-服務端與客戶端四種資料傳遞方式(2)

gpc服務端和客戶端的資料傳送有四種方式,客戶端啟動服務端的啟動程式碼在上篇文章已經描述,這裡將只列出關鍵實現的程式碼。

1.客戶端傳送一個物件,服務端返回一個物件

這種方式類似於傳統的Http請求資料的方式,在上篇文章有一個簡單的實現例子,在這裡不再描述。

2.客戶端傳送一個物件,服務端返回一個Stream物件

Stream物件在傳輸過程中會被當做集合,用Iterator來遍歷處理。來看一個實現例子:

proto檔案:

rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}

grpc中和thrift不同的是請求和返回的物件必須是message,將物件宣告為Stream將會以流的方式傳輸。

服務端實現:

@Override
    public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
        System.out.println("接受客戶端資訊: " + request.getAge());
        responseObserver.onNext(StudentResponse.newBuilder().setName("張三").setAge(20).setCity("北京").build
()); responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build()); responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("武漢").build()); responseObserver.onNext(StudentResponse.newBuilder().setName("趙六").setAge
(50).setCity("深圳").build()); responseObserver.onCompleted(); }

類似第一種方式,這種方式通過多次呼叫onNext來組裝多個訊息,從而最後返回一個Stream物件。

客戶端實現:

Iterator<StudentResponse> iterable = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
        while (iterable.hasNext()) {
            StudentResponse studentResponse = iterable.next();
            System.out.println(studentResponse.getName() + "," + studentResponse.getAge() + "," + studentResponse.getCity());
        }

3.客戶端傳送Stream物件,服務端返回一個簡單物件

proto檔案:

rpc GetStudentsWrapperByages(stream StudentRequest) returns (StudentResponseList) {}

服務端實現:

@Override
    public StreamObserver<StudentRequest> getStudentsWrapperByages(StreamObserver<StudentResponseList> responseObserver) {
        return new StreamObserver<StudentRequest>() {
            @Override
            public void onNext(StudentRequest value) {
                System.out.println("onNext: " + value.getAge());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                StudentResponse studentResponse = StudentResponse.newBuilder().setName("張三").setAge(20).setCity("西安").build();
                StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("廣州").build();

                StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse).addStudentResponse(studentResponse2).build();

                responseObserver.onNext(studentResponseList);
                responseObserver.onCompleted();
            }
        };
    }

客戶端每個訊息過來都會呼叫一次onNext方法,當客戶端傳送完畢後,會執行onCompleted來返回一個物件給客戶端。

客戶端實現:

StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
            @Override
            public void onNext(StudentResponseList value) {
                value.getStudentResponseList().forEach(studentResponse -> {
                    System.out.println(studentResponse.getName());
                    System.out.println(studentResponse.getAge());
                    System.out.println(studentResponse.getCity());
                    System.out.println("***********");

                });
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("completed!");
            }
        };

        StreamObserver<StudentRequest> studentRequestStreamObserver = studentServiceStub.getStudentsWrapperByages(studentResponseListStreamObserver);

     studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
        studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
        studentRequestStreamObserver.onCompleted();

studentResponseListStreamObserver這一部分是響應服務端返回的資料,studentRequestStreamObserver這一部分是用來向服務端傳送一個Stream物件。當執行完並不會有我們預料的結果,有一個關鍵點需要注意:

客戶端向服務端傳送一個Stream, 不能跟之前一樣使用阻塞的stu去傳送請求,而應該使用非同步的stb來處理:

StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);

原始碼如下:

/**
   * Creates a new async stub that supports all call types for the service
   */
  public static StudentServiceStub newStub(io.grpc.Channel channel) {
    return new StudentServiceStub(channel);
  }

建立一個非同步的stub去呼叫service,甚至會在onNext傳送的訊息還沒有傳送到服務端的時候整個程式就執行完了,因為是非同步的並不會有阻塞的等待,可以在程式末尾新增一個thread.sleep(seconds),然後程式會正常執行,可以驗證非同步的這個問題。

4.客戶端和服務端都傳輸的是Stream物件

proto檔案:

rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}

服務端實現:

@Override
    public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
        return new StreamObserver<StreamRequest>() {
            @Override
            public void onNext(StreamRequest value) {
                System.out.println("onNest: " + value.getRequestInfo());

                responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

客戶端實現:

            @Override
            public void onNext(StreamResponse value) {
                System.out.println(value.getResponseInfo());
            }

            @Override
            public void onError(Throwable t) {
                System.out.println(t.getMessage());
            }

            @Override
            public void onCompleted() {
                System.out.println("onCompleted!");
            }
        });

        for (int i = 0; i < 10;i ++) {
            requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

原始碼地址: