1. 程式人生 > >Yarn之ResourceManager詳細分析筆記(一)

Yarn之ResourceManager詳細分析筆記(一)

Yarn之ResourceManager詳細分析筆記(一)

 

 

 

http://zengzhaozheng.blog.51cto.com/8219051/1438204/

 

 

2014-07-15 08:58:18











http://zengzhaozheng.blog.51cto.com/8219051/1438204

 

 

一、概述

    本文將介紹ResourceManager在Yarn中的功能作用,從更細的粒度分析RM內部組成的各個元件功能和他們相互的互動方式。

二、ResourceManager的互動協議與基本職能

1、ResourceManager互動協議

    在整個Yarn框架中主要涉及到7個協議,分別是ApplicationClientProtocol、MRClientProtocol、ContainerManagementProtocol、ApplicationMasterProtocol、ResourceTracker、LocalizationProtocol、TaskUmbilicalProtocol,這些協議封裝了各個元件互動的資訊。ResourceManager現實功能需要和NodeManager以及ApplicationMaster進行資訊互動,其中涉及到的RPC協議有ResourceTrackerProtocol、ApplicationMasterProtocol和ResourceTrackerProtocol。

  • ResourceTracker

     

    NodeManager通過該協議向ResourceManager中註冊、彙報節點健康情況以及Container的執行狀態,並且領取ResourceManager下達的重新初始化、清理Container等命令。NodeManager和ResourceManager這種RPC通訊採用了和MRv1類似的“pull模型”(ResourceManager充當RPC server角色,NodeManager充當RPC client角色),NodeManager週期性主動地向ResourceManager發起請求,並且領取下達給自己的命令。

  • ApplicationMasterProtocol

     

    應用程式的ApplicationMaster同過該協議向ResourceManager註冊、申請和釋放資源。該協議和上面協議同樣也是採用了“pull模型”,其中在RPC機制中,ApplicationMaster充當RPC client角色,ResourceManager充當RPC server角色。

  • ApplicationClientProtocol

     

  • 客戶端通過該協議向ResourceManager提交應用程式、控制應用程式(如殺死job)以及查詢應用程式的執行狀態等。在該RPC 協議中應用程式客戶端充當RPC client角色,ResourceManager充當RPC server角色。

整理一下ResourceManager與NodeManager、ApplicationMaster和客戶端RPC協議互動的資訊:

wKioL1O-VNujpZZVAAHCkb1a3G0251.jpg

上圖中的ResourceTrackeServer、ApplicationMasterService 、ClientRMServer是ResourceManager中處理上述功能的元件。

1、ResourceManager基本職能

ResourceManager基本職能概括起來就以下幾方面:

  • 與客戶端進行互動,處理來自於客戶端的請求,如查詢應用的執行情況等。

  • 啟動和管理各個應用的ApplicationMaster,並且為ApplicationMaster申請第一個Container用於啟動和在它執行失敗時將它重新啟動。

  • 管理NodeManager,接收來自NodeManager的資源和節點健康情況彙報,並向NodeManager下達管理資源命令,例如kill掉某個container。

  • 資源管理和排程,接收來自ApplicationMaster的資源申請,並且為其進行分配。這個是它的最重要的職能。

     

     

三、ResourceManager內部組成架構分析

    ResourceManager在底層程式碼實現上將各個功能模組分的比較細,各個模組功能具有很強的獨立性。下圖所示的是ResourceManager中的大概的功能模組組成:

wKioL1O_joPj4oBtAAOzwfCDdiQ566.jpg

1、使用者互動模組

    使用者互動模組即上圖顯示的User Service管理模組。在這裡邊還可以看到根據不同的使用者型別啟用了不同的服務進行處理,AdminService處理管理員相關請求,ClientRMService處理普通客戶相關請求,這樣使得管理員不會因為普通客戶請求太多而造成堵塞。下面看看這2個服務的具體實現程式碼:

  • ClientRMService

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

public class ClientRMService extends AbstractService implements

    ApplicationClientProtocol {

  private static final ArrayList<ApplicationReport> EMPTY_APPS_REPORT = new ArrayList<ApplicationReport>();

 

  private static final Log LOG = LogFactory.getLog(ClientRMService.class);

 

  final private AtomicInteger applicationCounter = new AtomicInteger(0);

  final private YarnScheduler scheduler;//排程器

  final private RMContext rmContext;//RM上下文物件,其包含了RM大部分執行時資訊,如節點列表、佇列列表、應用程式列表等

  private final RMAppManager rmAppManager;//app管理物件

 

  private Server server;//一個RPC Server

  protected RMDelegationTokenSecretManager rmDTSecretManager;

 

  private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);

  InetSocketAddress clientBindAddress;

  //訪問控制物件,例如,一些應用程式在提交時設定了檢視許可權的話,其他普通使用者就無法檢視。

  private final ApplicationACLsManager applicationsACLsManager;

  private final QueueACLsManager queueACLsManager;

  ......

   @Override

  protected void serviceStart() throws Exception {

    Configuration conf = getConfig();

    YarnRPC rpc = YarnRPC.create(conf);

    this.server =   //實現RPC協議ApplicationClientProtocol 

      rpc.getServer(ApplicationClientProtocol.classthis,

            clientBindAddress,

            conf, this.rmDTSecretManager,

            conf.getInt(YarnConfiguration.RM_CLIENT_THREAD_COUNT, 

                YarnConfiguration.DEFAULT_RM_CLIENT_THREAD_COUNT));

     

    // Enable service authorization?

    if (conf.getBoolean(

        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 

        false)) {

      refreshServiceAcls(conf, new RMPolicyProvider());

    }

     

    this.server.start();

    ......

  }

從上面ClientRMService的基本程式碼架構我們可以看出:

(1)ClientRMService是一個RPC Server,主要為來自於普通客戶端的各種RPC請求。從程式碼實現的角度看,它是ApplicationClientProtocol協議的一個實現。

(2)之前我們已經說了,普通使用者可以通過該服務來獲得正在執行應用程式的相關資訊,如進度情況、應用程式列表等。上面程式碼中都將ResourceManager執行資訊封裝在RMContxt介面中了,下面來看看這個介面的一個實現物件RMContextImpl:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

public class RMContextImpl implements RMContext {

  //中央非同步排程器。RM中的各個服務和元件以及它們處理和輸出的事件型別都是通過中央非同步排程器組織在一起的,這樣可以有效提高系統的吞吐量。

  private final Dispatcher rmDispatcher;

 

  private final ConcurrentMap<ApplicationId, RMApp> applications//應用程式列表

    new ConcurrentHashMap<ApplicationId, RMApp>();

 

  private final ConcurrentMap<NodeId, RMNode> nodes//節點列表

    new ConcurrentHashMap<NodeId, RMNode>();

   

  private final ConcurrentMap<String, RMNode> inactiveNodes//非活躍節點列表

    new ConcurrentHashMap<String, RMNode>();

  //正在執行中的AP心跳監控物件

  private AMLivelinessMonitor amLivelinessMonitor;//正在執行中的AP心跳監控物件

  //執行完畢後的AM心跳監控物件

  private AMLivelinessMonitor amFinishingMonitor;

  //用於儲存ResourceManager執行狀態

  private RMStateStore stateStore = null;

  //用於Container的超時監控,應用程式必須在一定時間內(預設10Min)使用分配到的Container去執行task,否則會被回收

  private ContainerAllocationExpirer containerAllocationExpirer;

  //下面變數都是與安全管理相關的物件

  private final DelegationTokenRenewer delegationTokenRenewer;

  private final AMRMTokenSecretManager amRMTokenSecretManager;

  private final RMContainerTokenSecretManager containerTokenSecretManager;

  private final NMTokenSecretManagerInRM nmTokenSecretManager;

  private final ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager;

  private ClientRMService clientRMService;

  private RMDelegationTokenSecretManager rmDelegationTokenSecretManager;

  ......

  }

  • AdminService

AdminService和ClientRMService一樣都是作為RPC的服務端,它針對的處理管理員RPC請求,負責訪問許可權的控制,中Yarn中管理員許可權的設定可以在yarn-site.xml中yarn.admi.acl項進行設定,該項的預設值是*,也就是說如果不進行設定的話就當所有的使用者都是管理員。從程式碼上看,它是ResourceManagerAdministrationProtocol協議的一個實現:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

public class AdminService extends AbstractService implements ResourceManagerAdministrationProtocol {

 

  private static final Log LOG = LogFactory.getLog(AdminService.class);

 

  private final Configuration conf;

  private final ResourceScheduler scheduler;

  private final RMContext rmContext;

  private final NodesListManager nodesListManager;

   

  private final ClientRMService clientRMService;

  private final ApplicationMasterService applicationMasterService;

  private final ResourceTrackerService resourceTrackerService;

   

  private Server server;

  private InetSocketAddress masterServiceAddress;

  private AccessControlList adminAcl;

   

  private final RecordFactory recordFactory = 

    RecordFactoryProvider.getRecordFactory(null);

    .....

    }

AdminService程式碼和ClientRMService比較相似,它各類功能物件也差不多。

2、NodeManager管理

    NodeManager主要是通過NMLivelinessMonitor、ResourceTrackerService和NodeListManager這3大元件來對NodeManager的生命週期、心跳處理以及黑名單處理。

(1)ResourceTrackerService

    ResourceTrackerService是RPC協議ResourceTracker的一個實現,它作為一個RPC Server端接收NodeManager的RPC請求,請求主要包含2種資訊,註冊NodeManager和處理心跳資訊。NodeManger啟動時第一件事就是像ResourceManager註冊,註冊時NodeManager發給ResourceTrackerService的RPC包主要包含NodeManager所在節點的可用資源總量、對外開放的htpp埠、節點的host和port等資訊,具體程式碼看ResourceTrackerService#registerNodeManager方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

@SuppressWarnings("unchecked")

  @Override

  public RegisterNodeManagerResponse registerNodeManager(

      RegisterNodeManagerRequest request) throws YarnException,

      IOException {

 

    NodeId nodeId = request.getNodeId();//從NodeManager帶來的NodeID

    String host = nodeId.getHost();//NodeManager所在節點的host

    int cmPort = nodeId.getPort(); //NodeManager所在節點的port

    int httpPort = request.getHttpPort();//對外開放的http埠

    Resource capability = request.getResource();//獲得NodeManager所在節點的資源上限

 

    RegisterNodeManagerResponse response = recordFactory

        .newRecordInstance(RegisterNodeManagerResponse.class);

 

    // Check if this node is a 'valid' node

    //檢測節點host名稱的的合法性

    if (!this.nodesListManager.isValidNode(host)) {

      String message =

          "Disallowed NodeManager from  " + host

              ", Sending SHUTDOWN signal to the NodeManager.";

      LOG.info(message);

      response.setDiagnosticsMessage(message);

      response.setNodeAction(NodeAction.SHUTDOWN);

      return response;

    }

    .....

    }

ResourceTrackerService另外一種功能就是處理心跳資訊了,當NodeManager啟動後,它會週期性地呼叫RPC函式ResourceTracker#nodeHeartbeat彙報心跳,心跳資訊主要包含該節點的各個Container的執行狀態、正在執行的Application列表、節點的健康狀況等,隨後ResourceManager為該NodeManager返回需要釋放的Container列表、Application列表等資訊。其中心跳資訊處理的流程:首先,從NodeManager發來的心跳包中獲得節點的狀態狀態資訊,然後檢測該節點是否已經註冊過,然後檢測該節點的host名稱是否合法,例如是否在excluded列表中,然後再檢測該次心跳是不是第一次心跳資訊,這點非常重要,因為關係到心跳的重複傳送與應答的相關問題。其實ResourceTrackerService和NodeManager的心跳處理機制和之前Hadoop1.x中的JobTracker與TaskTacker之間的心跳處理很想象,具體請看我之前寫的一篇blog:http://zengzhaozheng.blog.51cto.com/8219051/1359887 ,再然後,為NodeManager返回心跳應答資訊,最後,想RMNode傳送該NodeManager的狀態資訊並且儲存最近一次心跳應答資訊。再具體看看ResourceTracker#nodeHeart方法:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

 public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)

      throws YarnException, IOException {

    //從RPC Clinet中獲得nodeManager所在節點的健康狀況

    NodeStatus remoteNodeStatus = request.getNodeStatus();

    /**

     * Here is the node heartbeat sequence...

     * 1. Check if it's a registered node

     * 2. Check if it's a valid (i.e. not excluded) node 

     * 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat 

     * 4. Send healthStatus to RMNode

     */

 

    NodeId nodeId = remoteNodeStatus.getNodeId();

 

    // 1. Check if it's a registered node

    RMNode rmNode = this.rmContext.getRMNodes().get(nodeId);

    if (rmNode == null) {

      /* node does not exist */

      String message = "Node not found resyncing " + remoteNodeStatus.getNodeId();

      LOG.info(message);

      resync.setDiagnosticsMessage(message);

      return resync;

    }

 

    // Send ping

    this.nmLivelinessMonitor.receivedPing(nodeId);

 

    // 2. Check if it's a valid (i.e. not excluded) node

    if (!this.nodesListManager.isValidNode(rmNode.getHostName())) {

      String message =

          "Disallowed NodeManager nodeId: " + nodeId + " hostname: "

              + rmNode.getNodeAddress();

      LOG.info(message);

      shutDown.setDiagnosticsMessage(message);

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeEvent(nodeId, RMNodeEventType.DECOMMISSION));

      return shutDown;

    }

     

    // 3. Check if it's a 'fresh' heartbeat i.e. not duplicate heartbeat

    NodeHeartbeatResponse lastNodeHeartbeatResponse = rmNode.getLastNodeHeartBeatResponse();

    if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse

        .getResponseId()) {

      LOG.info("Received duplicate heartbeat from node "

          + rmNode.getNodeAddress());

      return lastNodeHeartbeatResponse;

    else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse

        .getResponseId()) {

      String message =

          "Too far behind rm response id:"

              + lastNodeHeartbeatResponse.getResponseId() + " nm response id:"

              + remoteNodeStatus.getResponseId();

      LOG.info(message);

      resync.setDiagnosticsMessage(message);

      // TODO: Just sending reboot is not enough. Think more.

      this.rmContext.getDispatcher().getEventHandler().handle(

          new RMNodeEvent(nodeId, RMNodeEventType.REBOOTING));

      return resync;

    }

 

    // Heartbeat response

    NodeHeartbeatResponse nodeHeartBeatResponse = YarnServerBuilderUtils

        .newNodeHeartbeatResponse(lastNodeHeartbeatResponse.

            getResponseId() + 1, NodeAction.NORMAL, nullnullnullnull,

            nextHeartBeatInterval);

    rmNode.updateNodeHeartbeatResponseForCleanup(nodeHeartBeatResponse);

 

    populateKeys(request, nodeHeartBeatResponse);

 

    // 4. Send status to RMNode, saving the latest response.

    this.rmContext.getDispatcher().getEventHandler().handle(

        new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),

            remoteNodeStatus.getContainersStatuses(), 

            remoteNodeStatus.getKeepAliveApplications(), nodeHeartBeatResponse));

 

    return nodeHeartBeatResponse;

  }

(2)NodeListManager

    NodeListManager主要分管黑名單(include列表)和白名單(exlude列表)管理功能,分別有yarnresouecemanager.nodes.include-path和yarnresourcemanager.nodes.exclude-path指定。黑名單列表中的nodes不能夠和RM直接通訊(直接丟擲RPC異常),管理員可以對這兩個列表進行編輯,然後使用$HADOOP_HOME/bin/yarn rmadmin-refreshNodes動態載入修改後的列表,使之生效。

(3)NMLivelinessMonitor

    NMLivelinessMonitor主要是分管心跳異常請求。該服務會週期性地遍歷叢集中的所有NodeManager,如果某個NodeManager在一定時間內(預設10min,可以有引數yarn.nm.liveness-monitor.expiry-interval-ms配置)沒有進行心跳彙報,那麼則認為它已經死掉,同時在該節點上執行的Container也會被置為執行失敗釋放資源。那麼這些被置為失敗的Container是不會直接被RM分配執行的,RM只是負責將這些被置為失敗的Container資訊告訴它們所對應的ApplicationMaster,需不需要重新執行它說的算,如果需要從新執行的話,該ApplicationMaster要從新向RM申請資源,然後由ApplicationMaster與對應的NodeManager通訊以從新執行之前失敗的Container。

2、ApplicationMaster管理模組

 後面部分請看:http://zengzhaozheng.blog.51cto.com/8219051/1542067 

本文出自 “螞蟻” 部落格,請務必保留此出處http://zengzhaozheng.blog.51cto.com/8219051/1438204