基於Etcd/zookeeper等kv儲存實現服務發現
github 原始碼地址:ofollow,noindex">suniper/plum-mesh-agent
以之前完成的開源專案suniper-pma 的程式碼例項為例,闡述一下我的服務發現和負載均衡框架是如何實現的 (注:下文中為了更好理解,所貼的程式碼在原始碼的基礎上做了一些修改)
理論上所有的KV儲存都可以作為註冊中心,這裡以etcd和zk兩種介質為例,闡述service discovery的實現過程。
實現原理
- 將服務資訊註冊到KV Store指定節點的子節點中
- 通過KV Store中指定的節點獲取可用的服務資訊初始化可用的服務列表;
- 監聽KV Store中節點的變化,從而實現可用服務列表的動態更新
實現以上場景顯然需要我們有一個KV Store的驅動(Generic Driver)從而完成一些必要的操作。在第三點中,我將監聽KV Store中節點的變化
的功能也劃分到這個驅動中,推測可能用到的操作,定義介面如下:
public interface KVStore extends AutoCloseable { /** * Get node and its data, return null when it does not exist * * @param key node name * @return Node info * @throws Exception exception during operation */ Node get(String key) throws Exception; /** * List all child nodes and their data, return an empty list when * the parent node does not exist or has no children * * @param prefix 父節點 * @return List of Node * @throws Exception exception during operation */ List<Node> list(String prefix) throws Exception; /** * List the names of all child nodes, * return an empty list when the parent node does not exist or has no children * * @param prefix 父節點 * @return List of node name * @throws Exception exception during operation */ List<String> listKeys(String prefix) throws Exception; /** * Update the data to the node and persist the storage. * <p> * Update if the node already exists, * but does not modify the type of the node (temporary/persistent) * * @param keyNode name (configuration directory) * @param value Node data (registered service information) * @return reversion * @throws Exception exception during operation */ long put(String key, String value) throws Exception; /** * Update data to nodes and persist storage. * <p> * Update if the node already exists, * but does not modify the type of the node (temporary/persistent) * * @param keynode name (configuration directory) * @param valuenode data (registered service information) * @param ephemeral is a temporary node * @return reversion * @throws Exception exception during operation */ long put(String key, String value, boolean ephemeral) throws Exception; /** * Delete the node * * @param key node name * @return Number of nodes successfully deleted * @throws Exception IllegalArgumentException: Node is not empty * @throws Exception exception during operation * */ long delete(String key) throws Exception; /** * Whether the node exists * <p> * 節點是否存在 * * @param key node name 節點名稱 * @return true: exists 存在 * @throws Exception exception during operation */ boolean exists(String key) throws Exception; /** * Monitor changes to all child nodes (without parent nodes), continuous monitoring * <p> * 監視所有子節點的變化(不包含父節點),持續監聽 * * @param keyName of parent node 父節點名稱 * @param consumer Callback when child nodes change 子節點變化時的回撥 * @throws Exception exception during operation */ void watchChildren(String key, BiConsumer<Event, Node> consumer) throws Exception; /** * Monitor the changes of all child nodes (excluding the parent node), * and judge whether it needs to exit the monitoring according to the exit signal. * <p> * 監視所有子節點的變化(不包含父節點),根據退出訊號判斷是否需要退出監聽 * * @param keyName of parent node 父節點名稱 * @param exitSignSupplier Provide an exit signal, otherwise it will always listen 提供退出訊號,否則一直監聽 * @param consumer子節點變化時的回撥 * @throws Exception exception during operation */ void watchChildren(String key, Supplier<Boolean> exitSignSupplier, BiConsumer<Event, Node> consumer) throws Exception; /** * Used to create a parent (prefix) and a persistent node * <p> * 用於建立父節點(prefix), 且為持久節點 * * @param parentNode Name of parent node 父節點名稱 * @throws Exception exception during operation */ void createParentNode(String parentNode) throws Exception;
一方面,基於KVStore
介面,我們得以在無須關注介質型別(KVStore實現)的情況下,將服務發現的操作單獨抽出進行開發;
另一方面,在本文的實現方式中,我將監聽KV Store中節點的變化
的功能也劃分到這個驅動中,所以KVStore
對儲存介質的子節點的動態監聽的實現,即witchChildren
介面構成了服務發現的重要閉環。
想必也注意到了put(String key, String value, boolean ephemeral)
介面,是的,KVStore
必須滿足可以註冊臨時節點。服務程式啟動時將自己的資訊註冊到KVStore並且維護一個心跳保持節點資訊的有效,這樣在服務崩潰或停止時KV Store中儲存的資訊會自動失效,從而保證服務發現邏輯中拿到的都是有效的服務列表。
另外,在對服務節點監聽到的狀態變化中,我們將其分為三類:
public enum Event { UPDATE, DELETE, UNRECOGNIZED }
kv store的etcd的實現
原始碼地址:plum-mesh-discovery-etcd
etcd的基本操作可由com.coreos.jetcd 0.0.2
實現,我們只需關注如何監聽子節點的變化
以及如何建立臨時節點
子節點監聽
etcd的儲存結構類似阿里雲的oss,是扁平的kv儲存結構,可以直接通過prefix
檢索,舉個例子:
-
kv: v1
-
zookeeper: v2
- content: v3
- etcd: v4
- mongo: v5
-
zookeeper: v2
etcd可以通過kv可以表達出以上的層次化結構,但是在儲存中實際為平行的一系列的鍵:
- kv/zookeeper
- kv/zookeeper/content
- kv/etcd
- kv/mongo
所以在etcd中可以很方便地通過prefix
監聽所有的“子節點”,根據節點變化的事件回撥Consumer
:
@Override public void watchChildren(String key, Supplier<Boolean> exitSignSupplier, BiConsumer<Event, Node> consumer) throws InterruptedException { ByteSequence storeKey = Optional.ofNullable(key) .map(ByteSequence::fromString) .orElse(null); try (Watch watch = client.getWatchClient(); Watch.Watcher watcher = watch.watch(storeKey, WatchOption.newBuilder().withPrefix(storeKey).build())) { while (!exitSignSupplier.get()) { WatchResponse response = watcher.listen(); response.getEvents().forEach(watchEvent -> { // 跳過根節點的變化 if (watchEvent.getKeyValue().getKey().equals(storeKey)) return; Event event; // 此處的Event為上一節中定義的列舉型別 switch (watchEvent.getEventType()) { case PUT: event = Event.UPDATE; break; case DELETE: event = Event.DELETE; break; default: event = Event.UNRECOGNIZED; } KeyValue keyValue = watchEvent.getKeyValue(); Node info = kv2NodeInfo(keyValue); consumer.accept(event, info); }); } } }
建立臨時的節點
很遺憾,etcd沒有提供臨時id的功能,但是它提供了一個租約 Lease 的概念,我們可以在初始化客戶端時,同時生成一個租約,並且在租約到期時自動續約。當服務崩潰或者停止時,不再有能力自動續約,則節點自動失效。
生成租約並自動續約:
private static final int EPHEMERAL_LEASE = 60; // seconds private Client client; private long leaseId; private void initLease() throws ExecutionException, InterruptedException { Lease lease = client.getLeaseClient(); LeaseGrantResponse response = lease.grant(EPHEMERAL_LEASE).get(); leaseId = response.getID(); lease.keepAlive(leaseId); }
實現可臨時節點的方法
@Override public long put(String key, String value, boolean ephemeral) throws ExecutionException, InterruptedException { log.debug(String.format("put %s to %s", value, key)); KV kv = client.getKVClient(); ByteSequence storeKey = Optional.ofNullable(key) .map(ByteSequence::fromString) .orElse(null); ByteSequence storeValue = Optional.ofNullable(value) .map(ByteSequence::fromString) .orElse(null); PutOption.Builder builder = PutOption.newBuilder(); if (ephemeral) builder.withLeaseId(leaseId); PutResponse response = kv.put(storeKey, storeValue, builder.build()).get(); log.info(String.format("put key-value: key: %s, reversion: %s, has-prev: %s, ephemeral: %s", key, response.getHeader().getRevision(), response.hasPrevKv(), ephemeral)); return response.getHeader().getRevision(); }
由於租約到期會根據EPHEMERAL_LEASE
有一定的延遲,所以服務發現時有一定的機率拿到失活狀態的服務
kv store的zookeeper的實現
原始碼地址:plum-mesh-discovery-zk
zookeeper的基本操作可由org.apache.zookeeper
實現, 最新版為3.4.12
。和etcd一樣,我們只需關注如何監聽子節點的變化
以及如何建立臨時節點
子節點監聽
zookeeper提供了Watcher
,可以對某一節點的變化進行監聽,以下面的層次結構為例:
-
kv: v1
-
zookeeper: v2
- content: v3
- etcd: v4
- mongo: v5
-
zookeeper: v2
一、和etcd有所不同,zookeeper的的node與檔案系統的層次結構一樣,有著嚴格的parent和children的一對多的關係,無法通過prefix
遞迴列出所有的子節點
二、zk的Watcher
只能對一個Node進行監聽,並且回調了Event事件之後,這個Watcher
隨即失效
三、Watcher
可以監聽節點本身的事件(Update
、Delete
等)以及子節點更新的事件(NodeChildrenChanged
)
基於以上三點,已經可以滿足我們實現類似etcd的子節點監聽的要求:對kv/
的子節點進監聽,當新增了節點或kv/zookeeper
、kv/etcd
、kv/mongo
發生變化時回撥事件(kv/zookeeper/content
的變化無需關注):
- 監視節點及子節點的變化:
- 當前節點發生改變時,不做任何處理;
- 子節點發生變化時,建立相應的sub-node watcher監聽子節點的變化,並呼叫回撥通知變化的資訊
繼承Watcher
, 實現常規的監聽器(和etcd的實現邏輯類似)
常規操作,根據節點變化的事件回撥Consumer
/** * Watch change of node */ class SubWatcher implements Watcher { private BiConsumer<cn.suniper.mesh.discovery.model.Event, Node> consumer; private Supplier<Boolean> exitSignSupplier; private volatile boolean stopWatch; SubWatcher(BiConsumer<cn.suniper.mesh.discovery.model.Event, Node> consumer, Supplier<Boolean> exitSignSupplier) { this.consumer = consumer; this.exitSignSupplier = exitSignSupplier; } @Override public void process(WatchedEvent event) { if (exitSignSupplier.get()) { log.info("sub-node: stop watch event: " + event); return; } log.debug("sub-node: watch event: " + event); cn.suniper.mesh.discovery.model.Event wrapEvent; Node node; switch (event.getType()) { case NodeCreated: case NodeDataChanged: wrapEvent = cn.suniper.mesh.discovery.model.Event.UPDATE; try { node = get(event.getPath(), this); log.debug(String.format("get node(%s) data: ", event.getPath()) + node); } catch (Throwable throwable) { log.warn("error occurred in watcher", throwable); return; } break; case NodeDeleted: wrapEvent = cn.suniper.mesh.discovery.model.Event.DELETE; node = new Node(); node.setKey(event.getPath()); break; default: wrapEvent = cn.suniper.mesh.discovery.model.Event.UNRECOGNIZED; node = new Node(); node.setKey(event.getPath()); } stopWatch = true; consumer.accept(wrapEvent, node); } private SubWatcher activate() { this.stopWatch = false; return this; } }
繼承Watcher
,實現根節點的監聽器
這個監聽器只監聽根節點下所有子節點的變化。當根節點發生改變時(新增、刪除),list該節點下所有的節點,並檢查是否有對應的SubWatcher
(子節點事件監聽器),若無則為其新建一個監聽器:
class ChildrenWatcher implements Watcher { private final Supplier<Boolean> DEFAULT_SUPPLIER = () -> false; private ConcurrentHashMap<String, SubWatcher> childrenWatcher; private BiConsumer<cn.suniper.mesh.discovery.model.Event, Node> consumer; private String path; private Supplier<Boolean> exitSignSupplier; ChildrenWatcher(String path, BiConsumer<cn.suniper.mesh.discovery.model.Event, Node> consumer, Supplier<Boolean> exitSignSupplier) { this.consumer = consumer; this.childrenWatcher = new ConcurrentHashMap<>(); this.path = path; this.exitSignSupplier = exitSignSupplier == null ? DEFAULT_SUPPLIER : exitSignSupplier; this.listAndWatch(false); } @Override public void process(WatchedEvent event) { log.debug("parent-node: watch event: " + event); if (exitSignSupplier.get()) { log.info("parent-node: stop watch event: " + event); return; } switch (event.getType()) { case NodeChildrenChanged: this.listAndWatch(true); return; default: log.debug("ignore event"); } try { List<String> res = listKeys(path, this); log.debug(String.format("Sub nodes of %s: %s", path, res)); } catch (Exception e) { log.warn("failed to keep watch node: " + path, e); } } private void listAndWatch(boolean accept) { try { List<String> subList = listKeys(path, this); log.debug(String.format("size of %s: %s", path, subList.size())); for (String sub : subList) { SubWatcher watcher = childrenWatcher.computeIfAbsent(sub, k -> { log.debug("create new watcher for " + sub); SubWatcher newWatcher = new SubWatcher(consumer, exitSignSupplier); newWatcher.stopWatch = true; return newWatcher; }); if (!watcher.stopWatch) continue; log.debug("activate watcher for " + sub); watcher.activate(); if (accept) { Node node = get(sub, watcher); consumer.accept(cn.suniper.mesh.discovery.model.Event.UPDATE, node); } else { zooKeeper.exists(sub, watcher, null, null); } } } catch (Exception e) { log.warn("failed to list and watch node: " + path, e); } } }
建立臨時的節點
zookeeper本身支援建立臨時的節點,實現的原理是zk客戶端會維持與zk服務的心跳,當客戶端退出時,服務端檢測到心跳超時,就會自動刪除該臨時節點。
@Override public long put(String key, String value, boolean ephemeral) throws Exception { Stat stat = zooKeeper.exists(key, null); if (stat != null) { zooKeeper.setData(key, value.getBytes(), stat.getVersion()); } else { CreateMode mode = ephemeral ? CreateMode.EPHEMERAL : CreateMode.PERSISTENT; zooKeeper.create(key, value.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); } stat = zooKeeper.exists(key, null); log.info(String.format("current stat: %s", stat)); return stat.getMzxid(); }
關於多級父目錄的建立
zk節點的每一級父目錄必須為真實存在的節點,不像etcd一樣可以為虛擬的prefix,所以需要遞迴建立父目錄:
@Override public void createParentNode(String parentNode) throws Exception { PathUtils.validatePath(parentNode); if (parentNode.equals("/")) return; File node = new File(parentNode); String parent = node.getParent(); if (!exists(parent)) { createParentNode(parent); } put(parentNode, DEFAULT_NODE_VALUE); }
在KVStore
的基礎上完成服務註冊與發現
原始碼地址 >plum-mesh-discovery-core
服務註冊
-
無論使用何種KV Store,我們將註冊服務的資訊註冊到一個指定的根節點,這裡我們設為
/config/suniper
-
Register註冊服務時,在根節點下建立以
{ServerGroup}
為名的節點 =>/config/suniper/{ServerGroup}
-
相關的服務資訊會儲存在子節點中: key:
/config/suniper/{AppName}
<=> value:ip/port/weight
- 服務資訊儲存的節點會註冊為臨時節點,Register會以守護執行緒的方式保持連線,所以所有的KV Store必須滿足客戶端斷開連線一段時間之內會節點會自動失效 (KVStore提供的臨時節點註冊為我們提供了這個能力)
將服務提供者(Provider)以及服務註冊相關資訊作如下封裝(部分資訊是為了功能的拓展,不在這裡介紹):
public class ProviderInfo { // provider private String name; private String ip; private int weight; private int port; private long version; } public class Application { private List<String> registryUrlList; // 用作註冊中心的host:port列表 private ProviderInfo providerInfo; private String serverGroup; // 所在的服務組 }
建立相應節點/config/suniper/{ServerGroup}
並將Provider資訊註冊到這個節點中.
public void register(Application application) throws Exception { String parentNode = String.join("/", Constants.STORE_ROOT, application.getServerGroup()); ProviderInfo providerInfo = Optional.ofNullable(application.getProviderInfo()) .orElse(new ProviderInfo()); if (providerInfo.getIp() == null) { InetAddress address = HostUtil.getLocalIv4Address(); if (address != null) providerInfo.setIp(address.getHostAddress()); else throw new IllegalStateException("cannot get local host IP address"); } if (providerInfo.getPort() == 0 || providerInfo.getName() == null) { throw new IllegalArgumentException("please check you provider info"); } String storeValue = String.format("%s/%s/%s", providerInfo.getIp(), providerInfo.getPort(), providerInfo.getWeight()); String storeKey = String.join("/", parentNode, providerInfo.getName()); // check and create store.createParentNode(parentNode); long reversion = store.put(storeKey, storeValue, true); log.info(String.format("registered server: `%s` in node: `%s`, reversion: %s", storeKey, storeValue, reversion)); }
通常情況下,我們再服務啟動時呼叫register方法,將服務的資訊註冊到zk/etcd等註冊中心中
服務發現
在服務註冊
的操作中,我們已經明確了Node路徑的生成規則,所以根據服務組{ServerGroup}
我們就可以通過list根節點/config/suniper/{ServerGroup}
獲取到該服務組中所有的可用的服務列表;通過監聽該根節點(KVStore.watchChildren
),我們可以實時獲取到節點更新的資訊。
一、我們只需要呼叫KVStore.list("/config/suniper/{ServerGroup}")
即可獲取到初始化的服務列表
二、通過監聽根節點的變化,我們可以實時更新服務的列表:watchChildren(String key, Supplier<Boolean> exitSignSupplier, BiConsumer<Event, Node> consumer)
exitSignSupplier
是一個退出訊號的生成器,當不再需要監聽時,只需令supplier傳送一個true
的退出訊號即可,這個不在這裡敷述,可以看原始碼瞭解相關的實現。
我們關注一下BiConsumer<Event, Node> consumer
如何實現,直接以程式碼和註釋就行理解:
private class Holder implements BiConsumer<Event, Node> { private Map<String, ProviderInfo> providerInfoMap; // 快取 node path 和 provider info的對應關係(這個Map必須是執行緒安全的) private UpdateAction updateAction; // 執行服務列表更新動作的類,這裡知道它的行為即可 private final AtomicLong lastUpdated = new AtomicLong(System.currentTimeMillis()); // 記錄列表的最後更新時間 Holder(UpdateAction updateAction, Map<String, ProviderInfo> providerInfoMap) { this.providerInfoMap = providerInfoMap; this.updateAction = updateAction; } @Override public void accept(Event event, Node node) { // 將所有的事件分為 delete 和 update 兩類 String key = node.getKey(); switch (event) { case DELETE: // 子節點失效(被刪除)時,移除本地快取的Provider資訊 ProviderInfo removed = providerInfoMap.remove(key); if (removed != null) log.info(String.format("Service offline: %s - %s:%s", key, removed.getIp(), removed.getPort())); else log.info("Service offline: %s - no such provide cache"); update(); break; case UPDATE: // 子節點更新時(新增/更新屬性) ProviderInfo oldInfo = providerInfoMap.computeIfAbsent(key, k -> new ProviderInfo()); MapperUtil.node2Provider(node, oldInfo); // 這個方法將node資訊轉為provider資訊,並更新舊的provider資訊(使用快取,減少物件建立的次數) update(); break; default: log.info(String.format("unrecognized event: %s, key: %s", event, node.getKey())); } } private void update() { // 通知updateAction更新快取的服務列表 lastUpdated.set(System.currentTimeMillis()); updateAction.doUpdate(); } }
獲取並更新列表,統一在同一個方法中,每次list註冊中心中的所有子節點並更新本地的Providers快取
List<RegisteredServer> obtainServerListByKvStore(Map<String, ProviderInfo> providerInfoMap) { // providerInfoMap: 同上,快取 node path 和 provider info的對應關係(這個Map必須是執行緒安全的) Set<String> newKeys = new HashSet<>(); Consumer<Node> collectNewNodesToSet = node -> newKeys.add(node.getKey()); try { List<Node> nodeInfoList = store.list(parentNode); Stream<ProviderInfo> stream = nodeInfoList .stream() .peek(collectNewNodesToSet) // 將所有的node path新增到set中 .map(node -> providerMap.computeIfAbsent( node.getKey(), k -> MapperUtil.node2Provider(node))); // 將節點轉換為ProviderInfo並put到providerMap providerMap.keySet() .parallelStream() .filter(k -> !newKeys.contains(k)) // 過濾出providerMap中已經失效的provider資訊(store.list返回的列表中不再存在) .forEach(k -> providerMap.remove(k)); // 移除這些已失效的provider資訊 return map2ServerList(stream); // 將剩下的資訊轉換為Server List } catch (Exception e) { log.warn(String.format("failed to obtain list of servers: %s", parentNode), e); return Lists.newArrayList(); } }
負載均衡的實現
負載均衡演算法種類很多,為了滿足pma
的高可用性,這裡我用了Netflix/ribbon
中的負載均衡模組ribbon-loadbalancer
:
<dependency> <groupId>com.netflix.ribbon</groupId> <artifactId>ribbon-core</artifactId> <version>${ribbon.version}</version> </dependency> <dependency> <groupId>com.netflix.ribbon</groupId> <artifactId>ribbon-loadbalancer</artifactId> <version>${ribbon.version}</version> </dependency>
接入ribbon
的功能,我們需要分別實現/拓展如下介面/類:
- Server meta: com.netflix.loadbalancer.Server
- 服務列表更新器: com.netflix.loadbalancer.ServerListUpdater
- 動態服務列表: com.netflix.loadbalancer.ServerList
實現了以上介面之後,便可按照Netflix/ribbon 提供的方式使用他提供的負載均衡:doc
動態服務列表
動態服務列表繼承自com.netflix.loadbalancer.ServerList
, 只需實現如下方法:
public List<T> getInitialListOfServers(); public List<T> getUpdatedListOfServers();
在上節的服務發現
中提到了,obtainServerListByKvStore
可以獲取並更新可用的服務列表,所以只需將obtainServerListByKvStore
的值返回即可。
服務列表更新器
服務列表更新器需要實現以下介面,
public interface ServerListUpdater { /** * start the serverList updater with the given update action * This call should be idempotent. * * @param updateAction */ void start(UpdateAction updateAction); /** * stop the serverList updater. This call should be idempotent */ void stop(); /** * @return the last update timestamp as a {@link java.util.Date} string */ String getLastUpdate(); /** * @return the number of ms that has elapsed since last update */ long getDurationSinceLastUpdateMs(); /** * @return the number of update cycles missed, if valid */ int getNumberMissedCycles(); /** * @return the number of threads used, if vaid */ int getCoreThreads(); }
事實上這些介面與上節的服務發現介紹的Consumer
的功能高度重合,只需要將上面實現的Holder
對接到Updater中即可,具體實現可見原始碼RegistryServerListUpdater.java
以上,suniper-pma 的服務發現和負載均衡元件的實現粗糙的介紹到此結束。
BTW
-
ribbon-loadbalancer
的文件真非常剪短,很多東西都不明就裡而且googole不到,只能通過看原始碼來了解實現機理以及如何使用 -
ribbon
的官方文件中沒有說在pom依賴中應該新增哪些依賴,ribbon-loadbalancer-2.3.0
的pom中看到所有的依賴都是使用<scope>runtime</scope>
,在使用的時候屢屢受挫;想了解其他人是如何使用這個模組,但是隻能搜到大家都直接用spring-cloud整合的功能
關於WeightedResponseTimeRule
的負載均衡規則
The rule that use the average/percentile response times to assign dynamic “weights” per Server which is then used in the “Weighted Round Robin” fashion.
ribbon
的負載均衡中有一個WeightedResponseTimeRule
,然而從文件沒有詳細的介紹,原始碼中我都沒有找到每一輪更新權重(weights
)值的地方,在StackOverflow上提了問題,暫時沒有人解答,這裡先mark一下。WeightedResponseTimeRule: how it work in ribbon?
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation<T>() { @Override public Observable<T> call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }