zookeeper開源客戶端Curator典型應用場景之-服務註冊與發現(十一)
隨著業務增加,以前簡單的系統已經變得越來越複雜,單純的提升伺服器效能也不是辦法,而且程式碼也是越來越龐大,維護也變得越來越困難,這一切都催生了新的架構設計風格 – 微服務架構的出現。
微服務給我們帶來了很多好處,例如:獨立可擴充套件、易維護。但是隨著應用的分解,微服務的引入,服務越來越多,業務系統與服務系統之間的呼叫,該架構的問題暴露出來:最明顯的問題是所有的請求都需要nginx來轉發,隨著內部服務的越來越多,服務上線都需要修改nginx的配置。一旦內部網路調整,nginx upstream也需要做對應的配置調整,並且每個服務都還需要維護nginx的地址。所以,服務註冊中心誕生了。
什麼是服務註冊中心
註冊中心可以對服務上下線做統一管理。每個工作伺服器都可以作為資料的釋出方向叢集註冊自己的基本資訊,而讓某些監控伺服器作為訂閱方,訂閱工作伺服器的基本資訊,當工作伺服器的基本資訊發生改變如上下線、伺服器角色或服務範圍變更,監控伺服器可以得到通知並響應這些變化。服務自動註冊與發現後,不再需要寫死服務提供方地址,註冊中心基於介面名查詢服務提供者的IP地址,並且能夠平滑新增或刪除服務提供者。
當下火的註冊中心有Eureka與ZooKeeper,本篇就講一下使用zookeeper的開源客戶端Curator實現服務的註冊與發現:
Service Discovery
我們通常在呼叫服務的時候,需要知道服務的地址,埠,或者其他一些資訊,通常情況下,我們是把他們寫到程式裡面,但是隨著服務越來越多,維護起來也越來越費勁,更重要的是,由於地址都是在程式中配置的,我們根本不知道遠端的服務是否可用,當我們增加或者刪除服務,我們又需要到配置檔案中配置麼? 這時候,Zookeeper幫大忙了,我們可以把我們的服務註冊到Zookeeper中,建立一個臨時節點(當連線斷開之後,節點將被刪除),存放我們的服務資訊(url,ip,port等資訊),把這些臨時節點都存放在以serviceName命名的節點下面,這樣我們要獲取某個服務的地址,只需要到Zookeeper中找到這個path,然後就可以讀取到裡面存放的服務資訊,這時候我們就可以根據這些資訊呼叫我們的服務。這樣,通過Zookeeper我們就做到了動態的新增和刪除服務,做到了一旦一個服務時效,就會自動從Zookeeper中移除。
Curator Service Discovery就是為了解決這個問題而生的,它對此抽象出了ServiceInstance、ServiceProvider、ServiceDiscovery三個介面,通過它我們可以很輕易的實現Service Discovery。
ServiceInstance
Curator中使用ServiceInstance作為一個服務例項,ServiceInstances具有名稱,ID,地址,埠和/或ssl埠以及可選的payload(使用者定義)。ServiceInstances以下列方式序列化並存儲在ZooKeeper中:
ServiceProvider
ServiceProvider是主要的抽象類。它封裝了發現服務為特定的命名服務和提供者策略。提供者策略方案是從一組給定的服務例項選擇一個例項。有三個捆綁策略:輪詢排程、隨機和粘性(總是選擇相同的一個)。
ServiceProviders是使用ServiceProviderBuilder分配的。消費者可以從從ServiceDiscovery獲取ServiceProviderBuilder(參見下文)。ServiceProviderBuilder允許您設定服務名稱和其他幾個可選值。
必須通過呼叫start()來啟動ServiceProvider 。完成後,您應該呼叫close()。ServiceProvider中有以下兩個重要方法:
/** 獲取一個服務例項 */
public ServiceInstance<T> getInstance() throws Exception;
/** 獲取所有的服務例項 */
public Collection<ServiceInstance<T>> getAllInstances() throws Exception;
注意:在使用curator 2.x(ZooKeep3.4.x)時,服務提供者物件必須由應用程式快取並重用。因為服務提供者新增的內部NamespaceWatcher物件無法在ZooKeep3.4.x中刪除,所以為每個對相同服務的呼叫建立一個新的服務提供者最終將耗盡JVM的記憶體。
ServiceDiscovery
為了建立ServiceProvider,你必須有一個ServiceDiscovery。它是由一個ServiceDiscoveryBuilder建立。開始必須呼叫start()方法。當使用完成應該呼叫close()方法。
如果特定例項有I/O錯誤,等等。您應該呼叫ServiceProvider.NoteError(),並傳入例項。ServiceProvider將臨時將有錯誤的例項視為“關閉”。例項的閾值和超時是通過DownInstancePolicy設定的,該策略可以傳遞給ServiceProviderBuilder(注意:如果沒有指定預設DownInstancePolicy,則使用預設DownInstancePolicy)。
更細節API介紹
ServiceProvider API是大多數用途所需要的。但是,對於更精細的控制,您可以使用以下方法:
註冊/登出服務
通常,將應用程式的服務描述符傳遞給ServiceDiscovery建構函式,它將自動註冊/登出服務。但是,如果需要手動執行此操作,請使用以下方法:
/** 註冊服務 */
public void registerService(ServiceInstance<T> service) throws Exception;
/** 登出服務 */
public void unregisterService(ServiceInstance<T> service) throws Exception;
查詢服務
您可以查詢所有服務名稱、特定服務的所有例項或單個服務例項。
/** 查詢所有服務名稱 */
public Collection<String> queryForNames() throws Exception;
/** 查詢特定服務的所有例項 */
public Collection<ServiceInstance<T>> queryForInstances(String name) throws Exception;
/** 查詢單個服務例項 */
public ServiceInstance<T> queryForInstance(String name, String id) throws Exception;
服務快取
上述每個查詢方法都直接呼叫ZooKeeper。如果經常查詢服務,還可以使用ServiceCache。它在記憶體中快取特定服務的例項列表。它使用Watcher監聽使列表保持最新。
可以通過ServiceDiscovery.serviceCacheBuilder()返回的構建器分配ServiceCache 。通過呼叫start()啟動ServiceCache物件,完成後,應呼叫close()。然後可以通過呼叫以下內容獲取服務的當前已知例項列表:
/** 獲取快取服務列表 */
public List<ServiceInstance<T>> getInstances();
ServiceCache支援在Watcher更新例項列表時收到通知的偵聽器:
/**
* Listener for changes to a service cache
*/
public interface ServiceCacheListener extends ConnectionStateListener {
/**
* Called when the cache has changed (instances added/deleted, etc.)
*/
public void cacheChanged();
}
實踐
下面我們自己來實踐一下,來測試是否可以靈活部署(隨意 增加/刪除 機器)?
pom新增依賴
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-x-discovery</artifactId>
<version>4.0.1</version>
</dependency>
服務擴充套件資訊類
該類中可以自定義一些自己想要的屬性,例如方法需要的引數,方法的描述等等。
/**
* @author: wangsaichao
* @date: 2018/9/29
* @description: 服務附加資訊
*/
public class InstanceDetails {
public static final String ROOT_PATH = "/service";
/** 該服務擁有哪些方法 */
public Map<String,Service> services = new HashMap<>();
/** 服務描述 */
private String serviceDesc;
public InstanceDetails(){
this.serviceDesc = "";
}
public InstanceDetails(String serviceDesc){
this.serviceDesc = serviceDesc;
}
public String getServiceDesc() {
return serviceDesc;
}
public void setServiceDesc(String serviceDesc) {
this.serviceDesc = serviceDesc;
}
public Map<String, Service> getServices() {
return services;
}
public void setServices(Map<String, Service> services) {
this.services = services;
}
public static class Service {
/** 方法名稱 */
private String methodName;
/** 方法描述 */
private String desc;
/** 方法所需要的引數列表 */
private List<String> params;
public String getMethodName() {
return methodName;
}
public void setMethodName(String methodName) {
this.methodName = methodName;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;
}
public List<String> getParams() {
return params;
}
public void setParams(List<String> params) {
this.params = params;
}
}
}
服務生產者1
/**
* @author: wangsaichao
* @date: 2018/9/30
* @description: 服務1
*/
public class ProviderService1 {
public static void main(String[] args) throws Exception {
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.42:2181",
2000,2000, new ExponentialBackoffRetry(1000, 3));
client.start();
client.blockUntilConnected();
//服務構造器
ServiceInstanceBuilder<InstanceDetails> sib = ServiceInstance.builder();
//該服務中所有的介面
Map<String,InstanceDetails.Service> services = new HashMap<>();
// 新增訂單服務介面
//服務所需要的引數
ArrayList<String> addOrderParams = new ArrayList<>();
addOrderParams.add("createTime");
addOrderParams.add("state");
InstanceDetails.Service addOrderService = new InstanceDetails.Service();
addOrderService.setDesc("新增訂單");
addOrderService.setMethodName("addOrder");
addOrderService.setParams(addOrderParams);
services.put("addOrder",addOrderService);
//新增刪除訂單服務介面
ArrayList<String> delOrderParams = new ArrayList<>();
delOrderParams.add("orderId");
InstanceDetails.Service delOrderService = new InstanceDetails.Service();
delOrderService.setDesc("刪除訂單");
delOrderService.setMethodName("delOrder");
delOrderService.setParams(delOrderParams);
services.put("delOrder",delOrderService);
//服務的其他資訊
InstanceDetails payload = new InstanceDetails();
payload.setServiceDesc("訂單服務");
payload.setServices(services);
//將服務新增到 ServiceInstance
ServiceInstance<InstanceDetails> orderService = sib.address("127.0.0.1")
.port(8080)
.name("OrderService")
.payload(payload)
.uriSpec(new UriSpec("{scheme}://{address}:{port}"))
.build();
//構建 ServiceDiscovery 用來註冊服務
ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
.client(client)
.serializer(new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class))
.basePath(InstanceDetails.ROOT_PATH)
.build();
//服務註冊
serviceDiscovery.registerService(orderService);
serviceDiscovery.start();
System.out.println("第一臺服務註冊成功......");
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
serviceDiscovery.close();
client.close();
}
}
服務生產者2
服務生產者2和服務生產者1唯一不同的地方只是埠號改了一下,用來模擬兩臺不同的機器。
服務消費者
/**
* @author: wangsaichao
* @date: 2018/9/30
* @description:
*/
public class ConsumerClient {
public static void main(String[] args) throws Exception{
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.58.42:2181", new ExponentialBackoffRetry(1000, 3));
client.start();
client.blockUntilConnected();
ServiceDiscovery<InstanceDetails> serviceDiscovery = ServiceDiscoveryBuilder.builder(InstanceDetails.class)
.client(client)
.basePath(InstanceDetails.ROOT_PATH)
.serializer(new JsonInstanceSerializer<InstanceDetails>(InstanceDetails.class))
.build();
serviceDiscovery.start();
boolean flag = true;
//死迴圈來不停的獲取服務列表,檢視是否有新服務釋出
while(flag){
//根據名稱獲取服務
Collection<ServiceInstance<InstanceDetails>> services = serviceDiscovery.queryForInstances("OrderService");
if(services.size() == 0){
System.out.println("當前沒有發現服務");
Thread.sleep(10 * 1000);
continue;
}
for(ServiceInstance<InstanceDetails> service : services) {
//獲取請求的scheme 例如:http://127.0.0.1:8080
String uriSpec = service.buildUriSpec();
//獲取服務的其他資訊
InstanceDetails payload = service.getPayload();
//服務描述
String serviceDesc = payload.getServiceDesc();
//獲取該服務下的所有介面
Map<String, InstanceDetails.Service> allService = payload.getServices();
Set<Map.Entry<String, InstanceDetails.Service>> entries = allService.entrySet();
for (Map.Entry<String, InstanceDetails.Service> entry: entries) {
System.out.println(serviceDesc +uriSpec
+ "/" + service.getName()
+ "/" + entry.getKey()
+ " 該方法需要的引數為:"
+ entry.getValue().getParams().toString());
}
}
System.out.println("---------------------");
Thread.sleep(10*1000);
}
}
}
先啟動服務消費者,當前沒有發現服務,啟動服務生產者1 日誌列印生產者1的服務列表, 再啟動服務生產者2 日誌列印生產者1 和 生產者2的服務列表,然後停止生產者1 服務,日誌只打印生產者2的服務列表,最後全停掉,列印當前沒有服務。執行日誌如下:
當前沒有發現服務
訂單服務http://127.0.0.1:8080/OrderService/delOrder 該方法需要的引數為:[orderId]
訂單服務http://127.0.0.1:8080/OrderService/addOrder 該方法需要的引數為:[createTime, state]
---------------------
訂單服務http://127.0.0.1:8081/OrderService/delOrder 該方法需要的引數為:[orderId]
訂單服務http://127.0.0.1:8081/OrderService/addOrder 該方法需要的引數為:[createTime, state]
訂單服務http://127.0.0.1:8080/OrderService/delOrder 該方法需要的引數為:[orderId]
訂單服務http://127.0.0.1:8080/OrderService/addOrder 該方法需要的引數為:[createTime, state]
---------------------
訂單服務http://127.0.0.1:8081/OrderService/delOrder 該方法需要的引數為:[orderId]
訂單服務http://127.0.0.1:8081/OrderService/addOrder 該方法需要的引數為:[createTime, state]
---------------------
當前沒有發現服務