grpc-服務端與客戶端四種資料傳遞方式(2)
gpc服務端和客戶端的資料傳送有四種方式,客戶端啟動服務端的啟動程式碼在上篇文章已經描述,這裡將只列出關鍵實現的程式碼。
1.客戶端傳送一個物件,服務端返回一個物件
這種方式類似於傳統的Http請求資料的方式,在上篇文章有一個簡單的實現例子,在這裡不再描述。
2.客戶端傳送一個物件,服務端返回一個Stream物件
Stream物件在傳輸過程中會被當做集合,用Iterator來遍歷處理。來看一個實現例子:
proto檔案:
rpc GetStudentsByAge(StudentRequest) returns (stream StudentResponse) {}
grpc中和thrift不同的是請求和返回的物件必須是message,將物件宣告為Stream將會以流的方式傳輸。
服務端實現:
@Override
public void getStudentsByAge(StudentRequest request, StreamObserver<StudentResponse> responseObserver) {
System.out.println("接受客戶端資訊: " + request.getAge());
responseObserver.onNext(StudentResponse.newBuilder().setName("張三").setAge(20).setCity("北京").build ());
responseObserver.onNext(StudentResponse.newBuilder().setName("李四").setAge(30).setCity("天津").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("王五").setAge(40).setCity("武漢").build());
responseObserver.onNext(StudentResponse.newBuilder().setName("趙六").setAge (50).setCity("深圳").build());
responseObserver.onCompleted();
}
類似第一種方式,這種方式通過多次呼叫onNext來組裝多個訊息,從而最後返回一個Stream物件。
客戶端實現:
Iterator<StudentResponse> iterable = blockingStub.getStudentsByAge(StudentRequest.newBuilder().setAge(20).build());
while (iterable.hasNext()) {
StudentResponse studentResponse = iterable.next();
System.out.println(studentResponse.getName() + "," + studentResponse.getAge() + "," + studentResponse.getCity());
}
3.客戶端傳送Stream物件,服務端返回一個簡單物件
proto檔案:
rpc GetStudentsWrapperByages(stream StudentRequest) returns (StudentResponseList) {}
服務端實現:
@Override
public StreamObserver<StudentRequest> getStudentsWrapperByages(StreamObserver<StudentResponseList> responseObserver) {
return new StreamObserver<StudentRequest>() {
@Override
public void onNext(StudentRequest value) {
System.out.println("onNext: " + value.getAge());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
StudentResponse studentResponse = StudentResponse.newBuilder().setName("張三").setAge(20).setCity("西安").build();
StudentResponse studentResponse2 = StudentResponse.newBuilder().setName("李四").setAge(30).setCity("廣州").build();
StudentResponseList studentResponseList = StudentResponseList.newBuilder().addStudentResponse(studentResponse).addStudentResponse(studentResponse2).build();
responseObserver.onNext(studentResponseList);
responseObserver.onCompleted();
}
};
}
客戶端每個訊息過來都會呼叫一次onNext方法,當客戶端傳送完畢後,會執行onCompleted來返回一個物件給客戶端。
客戶端實現:
StreamObserver<StudentResponseList> studentResponseListStreamObserver = new StreamObserver<StudentResponseList>() {
@Override
public void onNext(StudentResponseList value) {
value.getStudentResponseList().forEach(studentResponse -> {
System.out.println(studentResponse.getName());
System.out.println(studentResponse.getAge());
System.out.println(studentResponse.getCity());
System.out.println("***********");
});
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("completed!");
}
};
StreamObserver<StudentRequest> studentRequestStreamObserver = studentServiceStub.getStudentsWrapperByages(studentResponseListStreamObserver);
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(20).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(30).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(40).build());
studentRequestStreamObserver.onNext(StudentRequest.newBuilder().setAge(50).build());
studentRequestStreamObserver.onCompleted();
studentResponseListStreamObserver這一部分是響應服務端返回的資料,studentRequestStreamObserver這一部分是用來向服務端傳送一個Stream物件。當執行完並不會有我們預料的結果,有一個關鍵點需要注意:
客戶端向服務端傳送一個Stream, 不能跟之前一樣使用阻塞的stu去傳送請求,而應該使用非同步的stb來處理:
StudentServiceGrpc.StudentServiceStub studentServiceStub = StudentServiceGrpc.newStub(managedChannel);
原始碼如下:
/**
* Creates a new async stub that supports all call types for the service
*/
public static StudentServiceStub newStub(io.grpc.Channel channel) {
return new StudentServiceStub(channel);
}
建立一個非同步的stub去呼叫service,甚至會在onNext傳送的訊息還沒有傳送到服務端的時候整個程式就執行完了,因為是非同步的並不會有阻塞的等待,可以在程式末尾新增一個thread.sleep(seconds),然後程式會正常執行,可以驗證非同步的這個問題。
4.客戶端和服務端都傳輸的是Stream物件
proto檔案:
rpc BiTalk(stream StreamRequest) returns (stream StreamResponse) {}
服務端實現:
@Override
public StreamObserver<StreamRequest> biTalk(StreamObserver<StreamResponse> responseObserver) {
return new StreamObserver<StreamRequest>() {
@Override
public void onNext(StreamRequest value) {
System.out.println("onNest: " + value.getRequestInfo());
responseObserver.onNext(StreamResponse.newBuilder().setResponseInfo(UUID.randomUUID().toString()).build());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
客戶端實現:
@Override
public void onNext(StreamResponse value) {
System.out.println(value.getResponseInfo());
}
@Override
public void onError(Throwable t) {
System.out.println(t.getMessage());
}
@Override
public void onCompleted() {
System.out.println("onCompleted!");
}
});
for (int i = 0; i < 10;i ++) {
requestStreamObserver.onNext(StreamRequest.newBuilder().setRequestInfo(LocalDateTime.now().toString()).build());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
原始碼地址: