歡迎訪問我的GitHub

https://github.com/zq2599/blog_demos

內容:所有原創文章分類彙總及配套原始碼,涉及Java、Docker、Kubernetes、DevOPS等;

《java版gRPC實戰》全系列連結

  1. 用proto生成程式碼
  2. 服務釋出和呼叫
  3. 服務端流
  4. 客戶端流
  5. 雙向流
  6. 客戶端動態獲取服務端地址
  7. 基於eureka的註冊發現

客戶端為什麼要動態獲取服務端地址

本文是《java版gRPC實戰》系列的第六篇,前面咱們在開發客戶端應用時,所需的服務端地址都是按如下步驟設定的:

  • 在application.yml中配置,如下圖:

  • 在用到gRPC的bean中,使用註解GrpcClient即可將Stub類注入到成員變數中:

  • 上述操作方式的優點是簡單易用好配置,缺點也很明顯:服務端的IP地址或者埠一旦有變化,就必須修改application.yml並重啟客戶端應用;

為什麼不用註冊中心

  • 您一定會想到解決上述問題最簡單的方法就是使用註冊中心,如nacos、eureka等,其實我也是這麼想的,直到有一天,由於工作原因,我要在一個已有的gRPC微服務環境部署自己的應用,這個微服務環境並非java技術棧,而是基於golang的,他們都使用了go-zero框架( 老扎心了),這個go-zero框架沒有提供java語言的SDK,因此,我只能服從go-zero框架的規則,從etcd中取得其他微服務的地址資訊,才能呼叫其他gRPC服務端,如下圖所示:

  • 如此一來,咱們之前那種在application.yml中配置服務端資訊的方法就用不上了,本篇咱們來開發一個新的gRPC客戶端應用,滿足以下需求:
  1. 建立Stub物件的時候,服務端的資訊不再來自注解GrpcClient,而是來自查詢etcd的結果;
  2. etcd上的服務端資訊有變化的時候,客戶端可以及時更新,而不用重啟應用;

本篇概覽

  • 本篇要開發名為get-service-addr-from-etcd的springboot應用,該應用從etcd取得local-server應用的IP和埠,然後呼叫local-server的sayHello介面,如下圖:

  1. 開發客戶端應用;
  2. 部署gRPC服務端應用;
  3. 部署etcd;
  4. 模擬go-zero的規則,將服務端應用的IP地址和埠寫入etcd;
  5. 啟動客戶端應用,驗證能否正常呼叫服務端的服務;
  6. 重啟服務端,重啟的時候修改埠;
  7. 修改etcd中服務端的埠資訊;
  8. 呼叫介面觸發客戶端重新例項化Stub物件;
  9. 驗證客戶端能否正常呼叫修改了埠的服務端服務;

原始碼下載

名稱 連結 備註
專案主頁 https://github.com/zq2599/blog_demos 該專案在GitHub上的主頁
git倉庫地址(https) https://github.com/zq2599/blog_demos.git 該專案原始碼的倉庫地址,https協議
git倉庫地址(ssh) [email protected]:zq2599/blog_demos.git 該專案原始碼的倉庫地址,ssh協議
  • 這個git專案中有多個資料夾,《java版gRPC實戰》系列的原始碼在grpc-tutorials資料夾下,如下圖紅框所示:

  • grpc-tutorials資料夾下有多個目錄,本篇文章對應的客戶端程式碼在get-service-addr-from-etcd目錄下,如下圖:

開發客戶端應用

  • 在父工程的build.gradle檔案中新增一行,這是etcd相關的庫,如下圖紅框所示:

  • 在父工程grpc-turtorials下面新建名為get-service-addr-from-etcd的模組,其build.gradle內容如下:
plugins {
id 'org.springframework.boot'
} dependencies {
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'net.devh:grpc-client-spring-boot-starter'
implementation 'io.etcd:jetcd-core'
implementation project(':grpc-lib')
}
  • 配置檔案application.yml,設定自己的web埠號和應用名,另外grpc.etcdendpoints是etcd叢集的地址資訊:
server:
port: 8084
spring:
application:
name: get-service-addr-from-etcd grpc:
# etcd的地址,從此處取得gRPC服務端的IP和埠
etcdendpoints: 'http://192.168.72.128:2379,http://192.168.50.239:2380,http://192.168.50.239:2381'
  • 啟動類DynamicServerAddressDemoApplication.java的程式碼就不貼了,普通的springboot啟動類而已;

  • 新增StubWrapper.java檔案,這是個spring bean,要重點關注的是simpleBlockingStub方法,當bean在spring註冊的時候simpleBlockingStub方法會被執行,這樣每當bean在spring註冊時,都會從etcd查詢gRPC服務端資訊,然後建立SimpleBlockingStub物件:

package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.KV;
import io.etcd.jetcd.kv.GetResponse;
import io.grpc.Channel;
import io.grpc.ManagedChannelBuilder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.Arrays;
import static com.google.common.base.Charsets.UTF_8; /**
* @author will ([email protected])
* @version 1.0
* @description: 包裝了SimpleBlockingStub例項的類,發起gRPC請求時需要用到SimpleBlockingStub例項
* @date 2021/5/8 19:34
*/
@Component("stubWrapper")
@Data
@Slf4j
@ConfigurationProperties(prefix = "grpc")
public class StubWrapper { /**
* 這是etcd中的一個key,該key對應的值是grpc服務端的地址資訊
*/
private static final String GRPC_SERVER_INFO_KEY = "/grpc/local-server"; /**
* 配置檔案中寫好的etcd地址
*/
private String etcdendpoints; private SimpleGrpc.SimpleBlockingStub simpleBlockingStub; /**
* 從etcd查詢gRPC服務端的地址
* @return
*/
public String[] getGrpcServerInfo() {
// 建立client類
KV kvClient = Client.builder().endpoints(etcdendpoints.split(",")).build().getKVClient(); GetResponse response = null; // 去etcd查詢/grpc/local-server這個key的值
try {
response = kvClient.get(ByteSequence.from(GRPC_SERVER_INFO_KEY, UTF_8)).get();
} catch (Exception exception) {
log.error("get grpc key from etcd error", exception);
} if (null==response || response.getKvs().isEmpty()) {
log.error("empty value of key [{}]", GRPC_SERVER_INFO_KEY);
return null;
} // 從response中取得值
String rawAddrInfo = response.getKvs().get(0).getValue().toString(UTF_8); // rawAddrInfo是“192.169.0.1:8080”這樣的字串,即一個IP和一個埠,用":"分割,
// 這裡用":"分割成陣列返回
return null==rawAddrInfo ? null : rawAddrInfo.split(":");
} /**
* 每次註冊bean都會執行的方法,
* 該方法從etcd取得gRPC服務端地址,
* 用於例項化成員變數SimpleBlockingStub
*/
@PostConstruct
public void simpleBlockingStub() {
// 從etcd獲取地址資訊
String[] array = getGrpcServerInfo(); log.info("create stub bean, array info from etcd {}", Arrays.toString(array)); // 陣列的第一個元素是gRPC服務端的IP地址,第二個元素是埠
if (null==array || array.length<2) {
log.error("can not get valid grpc address from etcd");
return;
} // 陣列的第一個元素是gRPC服務端的IP地址
String addr = array[0];
// 陣列的第二個元素是埠
int port = Integer.parseInt(array[1]); // 根據剛才獲取的gRPC服務端的地址和埠,建立channel
Channel channel = ManagedChannelBuilder
.forAddress(addr, port)
.usePlaintext()
.build(); // 根據channel建立stub
simpleBlockingStub = SimpleGrpc.newBlockingStub(channel);
}
}
  • GrpcClientService是封裝了StubWrapper的服務類:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.HelloReply;
import com.bolingcavalry.grpctutorials.lib.HelloRequest;
import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import io.grpc.StatusRuntimeException;
import lombok.Setter;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service; @Service
public class GrpcClientService { @Autowired(required = false)
@Setter
private StubWrapper stubWrapper; public String sendMessage(final String name) {
// 很有可能simpleStub物件為null
if (null==stubWrapper) {
return "invalid SimpleBlockingStub, please check etcd configuration";
} try {
final HelloReply response = stubWrapper.getSimpleBlockingStub().sayHello(HelloRequest.newBuilder().setName(name).build());
return response.getMessage();
} catch (final StatusRuntimeException e) {
return "FAILED with " + e.getStatus().getCode().name();
}
}
}
  • 新增一個controller類GrpcClientController,提供一個http介面,裡面會呼叫GrpcClientService的方法,最終完成遠端gRPC呼叫:
package com.bolingcavalry.dynamicrpcaddr;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; @RestController
public class GrpcClientController { @Autowired
private GrpcClientService grpcClientService; @RequestMapping("/")
public String printMessage(@RequestParam(defaultValue = "will") String name) {
return grpcClientService.sendMessage(name);
}
}
  • 接下來新增一個controller類RefreshStubInstanceController,對外提供一個http介面refreshstub,作用是刪掉stubWrapper這個bean,再重新註冊一次,這樣每當外部呼叫refreshstub介面,就可以從etcd取得服務端資訊再重新例項化SimpleBlockingStub成員變數,這樣就達到了客戶端動態獲取服務端地址的效果:
package com.bolingcavalry.dynamicrpcaddr;

import com.bolingcavalry.grpctutorials.lib.SimpleGrpc;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.support.AbstractBeanDefinition;
import org.springframework.beans.factory.support.BeanDefinitionBuilder;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController; @RestController
public class RefreshStubInstanceController implements ApplicationContextAware { private ApplicationContext applicationContext; @Autowired
private GrpcClientService grpcClientService; @Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
} @RequestMapping("/refreshstub")
public String refreshstub() { String beanName = "stubWrapper"; //獲取BeanFactory
DefaultListableBeanFactory defaultListableBeanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); // 刪除已有bean
defaultListableBeanFactory.removeBeanDefinition(beanName); //建立bean資訊.
BeanDefinitionBuilder beanDefinitionBuilder = BeanDefinitionBuilder.genericBeanDefinition(StubWrapper.class); //動態註冊bean.
defaultListableBeanFactory.registerBeanDefinition(beanName, beanDefinitionBuilder.getBeanDefinition()); // 更新引用關係(注意,applicationContext.getBean方法很重要,會觸發StubWrapper例項化操作)
grpcClientService.setStubWrapper(applicationContext.getBean(StubWrapper.class)); return "Refresh success";
}
}
  • 編碼完成,開始驗證;

部署gRPC服務端應用

部署gRPC服務端應用很簡單,啟動local-server應用即可:

部署etcd

  • 為了簡化操作,我這裡的etcd叢集是用docker部署的,對應的docker-compose.yml檔案內容如下:
version: '3'
services:
etcd1:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd1'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd1:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd1:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2379:2379
volumes:
- ./store/etcd1/data:/etcd_data
etcd2:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd2'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd2:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd2:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2380:2379
volumes:
- ./store/etcd2/data:/etcd_data
etcd3:
image: "quay.io/coreos/etcd:v3.4.7"
entrypoint: /usr/local/bin/etcd
command:
- '--name=etcd3'
- '--data-dir=/etcd_data'
- '--initial-advertise-peer-urls=http://etcd3:2380'
- '--listen-peer-urls=http://0.0.0.0:2380'
- '--listen-client-urls=http://0.0.0.0:2379'
- '--advertise-client-urls=http://etcd3:2379'
- '--initial-cluster-token=etcd-cluster'
- '--heartbeat-interval=250'
- '--election-timeout=1250'
- '--initial-cluster=etcd1=http://etcd1:2380,etcd2=http://etcd2:2380,etcd3=http://etcd3:2380'
- '--initial-cluster-state=new'
ports:
- 2381:2379
volumes:
- ./store/etcd3/data:/etcd_data
  • 準備好上述檔案後,執行docker-compose up -d即可建立叢集;

將服務端應用的IP地址和埠寫入etcd

  • 我這邊local-server所在伺服器IP是192.168.50.5,埠9898,所以執行以下命令將local-server資訊寫入etcd:
docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9898

啟動客戶端應用

  • 開啟DynamicServerAddressDemoApplication.java,點選下圖紅框位置,即可啟動客戶端應用:

  • 注意下圖紅框中的日誌,該日誌證明客戶端應用從etcd獲取服務端資訊成功:

  • 瀏覽器訪問應用get-service-addr-from-etcd的http介面,成功收到響應,證明gRPC呼叫成功:

  • 去看local-server的控制檯,如下圖紅框,證明遠端呼叫確實執行了:

重啟服務端,重啟的時候修改埠

  • 為了驗證動態獲取服務端資訊是否有效,咱們先把local-server應用的埠改一下,如下圖紅框,改成9899:

  • 改完重啟local-server,如下圖紅框,可見gRPC埠已經改為9899:

  • 這時候再訪問get-service-addr-from-etcd的http介面,由於get-service-addr-from-etcd不知道local-server的監聽埠發生了改變,因此還是去訪問9898埠,毫無意外的返回了失敗:

修改etcd中服務端的埠資訊

現在執行以下命令,將etcd中的服務端資訊改為正確的:

docker exec 08_etcd2_1 /usr/local/bin/etcdctl put /grpc/local-server 192.168.50.5:9899

呼叫介面觸發客戶端重新例項化Stub物件

  • 聰明的您一定知道接下來要做的事情了:讓StubWrapper的bean重新在spring環境註冊,也就是呼叫RefreshStubInstanceController提供的http介面refreshstub:

  • 檢視get-service-addr-from-etcd應用的控制檯,如下圖紅框,StubWrapper已經重新註冊了,並且從etcd取得了最新的服務端資訊:

驗證客戶端能否正常呼叫修改了埠的服務端服務

  • 再次訪問get-service-addr-from-etcd應用的web介面,如下圖,gRPC呼叫成功:

  • 至此,在不修改配置不重啟服務的情況下,客戶端也可以適應服務端的變化了,當然了,本文只是提供基本的操作參考,實際上的微服務環境會更復雜,例如refreshstub介面可能被其他服務呼叫,這樣服務端有了變化可以更加及時地被更新,還有客戶端本身也肯能是gRPC服務提供方,那也要把自己註冊到etcd上去,還有利用etcd的watch功能監控指定的服務端是否一直存活,以及同一個gRPC服務的多個例項如何做負載均衡,等等,這些都要根據您的實際情況來定製;

  • 本篇內容過多,可見對於這些官方不支援的微服務環境,咱們自己去做註冊發現的適配很費時費力的,如果設計和選型能自己做主,我們更傾向於使用現成的註冊中心,接下來的文章,咱們就一起嘗試使用eureka為gRPC提供註冊發現服務;

你不孤單,欣宸原創一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 資料庫+中介軟體系列
  6. DevOps系列

歡迎關注公眾號:程式設計師欣宸

微信搜尋「程式設計師欣宸」,我是欣宸,期待與您一同暢遊Java世界...

https://github.com/zq2599/blog_demos