1. 程式人生 > >使用Akka構建集群(一)

使用Akka構建集群(一)

tps 改變 state err 狀態改變 ted leave ali 清單

概述

  Akka提供的非常吸引人的特性之一就是輕松構建自定義集群,這也是我要選擇Akka的最基本原因之一。如果你不想敲太多代碼,也可以通過簡單的配置構建一個非常簡單的集群。本文為說明Akka集群構建的學習成本低廉,以Akka官網的例子代碼出發,進行簡單改造後與Spring集成,有關Spring集成的信息你可以選擇閱讀《Spring與Akka的集成》一文。本文所講述的是一款十分簡便的集群監聽器,它通過訂閱集群成員的消息,對整個集群的成員進行管理(管理的方式只是打印一行日誌)。

Akka集群規範

  根據Akka官網的描述——Akka集群特性提供了容錯的、去中心化的、基於集群成員關系點對點的,不存在單點問題、單點瓶頸的服務。其實現原理為閑聊協議和失敗檢查。

集群概念

  • 節點(node):集群中的邏輯成員。允許一臺物理機上有多個節點。由元組hostname:port:uid唯一確定。
  • 集群(cluster):由成員關系服務構建的一組節點。
  • 領導(leader):集群中唯一扮演領導角色的節點。
  • 種子節點(seed node):作為其他節點加入集群的連接點的節點。實際上,一個節點可以通過向集群中的任何一個節點發送Join(加入)命令加入集群。

節點狀態

這裏以Akka官網提供的成員狀態狀態圖為例,如圖1所示。

技術分享圖片

圖1

圖1展示了狀態轉換的兩個因素:動作和狀態。

狀態

  • joining:節點正在加入集群時的狀態。
  • weekly up:配置了akka.cluster.allow-weakly-up-members=on時,啟用的狀態。
  • up:集群中節點的正常狀態。
  • leaving/exiting:優雅的刪除節點時,節點的狀態。
  • down:標記為已下線的狀態。
  • removed:墓碑狀態,表示已經不再是集群的成員。

動作

  • join:加入集群。
  • leave:告知節點優雅的離開集群。
  • down:標記集群為已下線。

配置

  本節將要展示構建集群所需要的最基本的配置,幾乎不會引入過多的開發成本,一個集群就構建完成了。application.conf文件的內容如下:

akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 2551
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://[email protected]:2551",
      "akka.tcp://[email protected]:2552"]

    #//#snippet
    # excluded from snippet
    auto-down-unreachable-after = 10s
    #//#snippet
    # auto downing is NOT safe for production deployments.
    # you may want to use it during development, read more about it in the docs.
    #
    # auto-down-unreachable-after = 10s
    
    # Disable legacy metrics in akka-cluster.
	metrics.enabled=off
  }

}

此配置文件與我在《使用Akka的遠程調用》一文中的配置有很多不同:

  1. provider不再是akka.remote.RemoteActorRefProvider,而是akka.cluster.ClusterActorRefProvider。這說明ActorRef將由akka.cluster.ClusterActorRefProvider提供;
  2. 增加了cluster配置;

cluster配置詳解

首先任何一個集群都需要種子節點,作為基本的加入集群的連接點。本例中以我本地的兩個節點(分別監聽2551和2552端口)作為種子節點。無論配置了多少個種子節點,除了在seed-nodes中配置的第一個種子節點需要率先啟動之外(否則其它種子節點無法初始化並且其它節點也無法加入),其余種子節點都是啟動順序無關的。第一個節點需要率先啟動的另一個原因是如果每個節點都可以率先啟動,那麽有可能造成一個集群出現幾個種子節點都啟動並且加入了自己的集群,此時整個集群實際上分裂為幾個集群,造成孤島。當你啟動了超過2個以上的種子節點,那麽第一個啟動的種子節點是可以關閉下線的。如果第一個種子節點重啟了,它將不會在自己創建集群而是向其它種子節點發送Join消息加入已存在的集群。 註意:除了akka.remote.netty.tcp.port配置項指定的端口不同,所有加入集群節點的application.conf可以完全一樣。如果akka.remote.netty.tcp.port未指定,那麽Akka會為你隨機選擇其他未占用的端口。

簡單集群監聽器

  我們創建一個簡單的集群監聽器SimpleClusterListener(實際上是一個Actor,因為繼承了UntypedActor),它向集群訂閱MemberEvent(成員事件)和UnreachableMember(不可達成員)兩種消息,來對集群成員進行管理(打印),其實現見代碼清單1所示。

代碼清單1

@Named("SimpleClusterListener")
@Scope("prototype")
public class SimpleClusterListener extends UntypedActor {
	LoggingAdapter log = Logging.getLogger(getContext().system(), this);
	Cluster cluster = Cluster.get(getContext().system());

	// subscribe to cluster changes
	@Override
	public void preStart() {
		// #subscribe
		cluster.subscribe(getSelf(), ClusterEvent.initialStateAsEvents(), MemberEvent.class, UnreachableMember.class);
		// #subscribe
	}

	// re-subscribe when restart
	@Override
	public void postStop() {
		cluster.unsubscribe(getSelf());
	}

	@Override
	public void onReceive(Object message) {
		if (message instanceof MemberUp) {
			MemberUp mUp = (MemberUp) message;
			log.info("Member is Up: {}", mUp.member());

		} else if (message instanceof UnreachableMember) {
			UnreachableMember mUnreachable = (UnreachableMember) message;
			log.info("Member detected as unreachable: {}", mUnreachable.member());

		} else if (message instanceof MemberRemoved) {
			MemberRemoved mRemoved = (MemberRemoved) message;
			log.info("Member is Removed: {}", mRemoved.member());

		} else if (message instanceof MemberEvent) {
			// ignore

		} else {
			unhandled(message);
		}
	}
}

運行展示

初始化的代碼如下:
		logger.info("Start simpleClusterListener");
		final ActorRef simpleClusterListener = actorSystem.actorOf(springExt.props("SimpleClusterListener"), "simpleClusterListener");
		actorMap.put("simpleClusterListener", simpleClusterListener);
		logger.info("Started simpleClusterListener");

我們首先啟動第一個種子節點,配置跟第一小節完全一致。我們觀察SimpleClusterListener的日誌輸出如下圖所示。

技術分享圖片

我們再啟動第二個種子節點,其配置的akka.remote.netty.tcp.port為2552,我們觀察SimpleClusterListener的日誌輸出如下圖所示。

技術分享圖片

我們再啟動一個非種子節點,沒有為其指定akka.remote.netty.tcp.port,我們觀察SimpleClusterListener的日誌輸出如下圖所示。

技術分享圖片

可以看到新加入的節點信息被SimpleClusterListener打印出來了,細心的同學可能發現了一些Akka集群中各個節點的狀態遷移信息,第一個種子節點正在加入自身創建的集群時的狀態時JOINING,由於第一個種子節點將自己率先選舉為Leader,因此它還將自己的狀態改變為Up。後面它還將第二個種子節點和第三個節點從JOINING轉換到Up狀態。

我們停止第三個加入的節點,我們觀察SimpleClusterListener的日誌輸出如下圖所示。

技術分享圖片

可以看到其狀態首先被標記為Down,最後被轉換為Removed。

總結

  通過以上介紹相信大家對使用Akka構建集群有了基本的認識,是不是很輕松?

後記:經過近一年的準備,《Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖: 技術分享圖片 售賣鏈接如下: 京東:https://item.jd.com/12302500.html

使用Akka構建集群(一)