YARN原始碼分析之ApplicationMaster分配策略
一次和朋友的談話中涉及到ApplicationMaster的container分配策略是什麼,我映像中是隨機分配的,但他說是根據各節點空閒資源來分配的。
之前看程式碼的時候也沒注意這塊的邏輯,既然現在有了疑惑那就去程式碼裡瞅瞅。
從MR的執行log中可以找到AM的container是在什麼時候分配的,見log
2017-04-09 03:26:17,113 INFO org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl: appattempt_1491729774382_0001_000001 State change from SUBMITTED to SCHEDULED
2017 -04-09 03:26:17,415 INFO org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl: container_1491729774382_0001_01_000001 Container Transitioned from NEW to ALLOCATED
AM container是在appattempt的狀態由SUBMITTED
變為SCHEDULED
時初始化的。
appattempt由SUBMITTED
變為SCHEDULED
狀態的處理邏輯為:
public static final class ScheduleTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
if (!subCtx.getUnmanagedAM()) {
// Need reset #containers before create new attempt, because this request
// will be passed to scheduler, and scheduler will deduct the number after
// AM container allocated
// 設定am container的請求
appAttempt.amReq.setNumContainers(1);
appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
// ResourceName為ANY表示任何機架上的任一機器
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);
// 由排程器來分配資源
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Collections.singletonList(appAttempt.amReq),
EMPTY_CONTAINER_RELEASE_LIST, null, null);
...
return RMAppAttemptState.SCHEDULED;
} else {
...
}
}
}
首先為AM container構造container請求,其實從appAttempt.amReq.setResourceName(ResourceRequest.ANY)
就可以看出am container的分配原則是隨機的,因為在建立請求時對ResourceName並沒有要求。但我們還是繼續看下程式碼以驗證下。
請求建立成功之後,由排程器來分配資源,這裡預設使用的是Capacity排程,程式碼如下:
// CapacityScheduler.java
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
List<ResourceRequest> ask, List<ContainerId> release,
List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
...
// Release containers
releaseContainers(release, application);
synchronized (application) {
...
if (!ask.isEmpty()) {
...
application.showRequests();
// 將請求該application attempt的map中
// Update application requests
application.updateResourceRequests(ask);
application.showRequests();
}
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
//
return application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
}
}
CapacityScheduler分配請求時,呼叫application.updateResourceRequests(ask)
將請求放入map中,等待nm心跳時來取。
這個application是FiCaSchedulerApp
的物件,FiCaSchedulerApp其實對應的是application attempt。updateResurceRequests程式碼如下:
public synchronized void updateResourceRequests(
List<ResourceRequest> requests) {
if (!isStopped) {
// AppSchedulingInfo.updateResourceRequests
appSchedulingInfo.updateResourceRequests(requests, false);
}
}
AppSchedulingInfo記錄了application的所有消費情況,當然也包括這個application正在執行或者已經完成的container。
synchronized public void updateResourceRequests(
List<ResourceRequest> requests, boolean recoverPreemptedRequest) {
// Update resource requests
for (ResourceRequest request : requests) {
Priority priority = request.getPriority();
String resourceName = request.getResourceName();
boolean updatePendingResources = false;
ResourceRequest lastRequest = null;
// 如果request的ResourceName是ResourceRequest.ANY
// 只有am container是ANY???不應該吧
if (resourceName.equals(ResourceRequest.ANY)) {
...
// ResourceRequest.ANY才置為true??
updatePendingResources = true;
// Premature optimization?
// Assumes that we won't see more than one priority request updated
// in one call, reasonable assumption... however, it's totally safe
// to activate same application more than once.
// Thus we don't need another loop ala the one in decrementOutstanding()
// which is needed during deactivate.
if (request.getNumContainers() > 0) {
activeUsersManager.activateApplication(user, applicationId);
}
}
// requests是一個請求列表 map
// 檢視requests中是否已有該優先順序的請求
// this.requests中存放的是這個application的request
Map<String, ResourceRequest> asks = this.requests.get(priority);
// 沒有此優先順序的請求,則new一個map
if (asks == null) {
asks = new HashMap<String, ResourceRequest>();
this.requests.put(priority, asks);
this.priorities.add(priority);
}
// asks不為null,檢視asks中是否有與此請求ResourceName一樣的請求
lastRequest = asks.get(resourceName);
if (recoverPreemptedRequest && lastRequest != null) {
// Increment the number of containers to 1, as it is recovering a
// single container.
request.setNumContainers(lastRequest.getNumContainers() + 1);
}
// 把原來的請求拿出賦值給lastRequest,
// 然後將新的request將入asks中,lastRequest怎麼辦?在哪處理的?
asks.put(resourceName, request);
if (updatePendingResources) {
// Similarly, deactivate application?
if (request.getNumContainers() <= 0) {
LOG.info("checking for deactivate... ");
checkForDeactivation();
}
int lastRequestContainers = lastRequest != null ? lastRequest
.getNumContainers() : 0;
Resource lastRequestCapability = lastRequest != null ? lastRequest
.getCapability() : Resources.none();
metrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);
}
}
}
updateResourceRequests主要是將請求放入requests
中,等待nm心跳來取。不過這裡有點模糊,在更新requests之前,如果存在該ResourceName的請求則取出,賦值給lastRequest
,然後這個lastRequest是怎麼處理的呢?不知道怎麼回事,標註下。
更新完requests之後,回到CapacityScheduler.allocate
中繼續執行,return時執行application.getAllocation
返回一個Allocation物件,這裡會給container建立TOKEN,這裡建立TOKEN的container是已經分配給nm的,也就是已經例項化的RMContainer,是不是說排程器在排程container時,先建立一個請求,然後從newlyAllocatedContainers
列表中取出上次請求container的響應結果?
am container的請求建立好之後,等待nm心跳來取
某個nm傳送來了心跳,
程式碼如下:
// CapacityScheduler.handle NODE_UPDATE事件
case NODE_UPDATE:
{
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
RMNode node = nodeUpdatedEvent.getRMNode();
// 更新該節點上的container的資訊
// 對剛分配到該節點的container進行launch,已經完成的container進行狀態轉移
nodeUpdate(node);
if (!scheduleAsynchronously) {
// 該節點取container請求
allocateContainersToNode(getNode(node.getNodeID()));
}
}
nm與CapacityScheduler心跳之後,通過nodeUpdate(node)
對改節點上已有的container進行狀態更新,然後呼叫allocateContainersToNode
去拉取新的container請求。
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
...
// Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations
// 如果有預留container的話先分配預留的container
...
// Try to schedule more if there are no reservations to fulfill
if (node.getReservedContainer() == null) {
// 計算nm上是否還有空閒的資源進行分配container
if (calculator.computeAvailableContainers(node.getAvailableResource(),
minimumAllocation) > 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
// 分配container
root.assignContainers(clusterResource, node, false);
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
" is reserved by application " +
node.getReservedContainer().getContainerId().getApplicationAttemptId()
);
}
}
排程器給這臺nm排程container時,先判斷這臺nm上是否有預留的container,如果有先對預留的container進行分配,如果沒有預留的container才呼叫root.assignContainers
進行排程。
root是CSQueue
物件,CSQueue是一個介面,抽象類AbstractCSQueue實現了該介面,而AbstractCSQueue又被ParentQueue和ChildQueue繼承,這裡呼叫的是ParentQueue的assignContainers,程式碼如下:
public synchronized CSAssignment assignContainers(
Resource clusterResource, FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
return assignment;
}
while (canAssign(clusterResource, node)) {
...
boolean localNeedToUnreserve = false;
Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
// Are we over maximum-capacity for this queue?
if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
break;
}
}
// Schedule
CSAssignment assignedToChild =
assignContainersToChildQueues(clusterResource, node, localNeedToUnreserve | needToUnreserve);
assignment.setType(assignedToChild.getType());
...
// Do not assign more than one container if this isn't the root queue
// or if we've already assigned an off-switch container
if (!rootQueue || assignment.getType() == NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
if (rootQueue && assignment.getType() == NodeType.OFF_SWITCH) {
LOG.debug("Not assigning more than one off-switch container," +
" assignments so far: " + assignment);
}
}
break;
}
}
return assignment;
}
分配時,先判斷此佇列是否可以訪問該nm,然後判斷是否可以訪問該nm上的label,都判斷通過之後呼叫assignContainersToChildQueues
進行分配,
private synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
FiCaSchedulerNode node, boolean needToUnreserve) {
...
// Try to assign to most 'under-served' sub-queue
for (Iterator<CSQueue> iter=childQueues.iterator(); iter.hasNext();) {
CSQueue childQueue = iter.next();
...
assignment = childQueue.assignContainers(cluster, node, needToUnreserve);
...
// If we do assign, remove the queue and re-insert in-order to re-sort
if (Resources.greaterThan(
resourceCalculator, cluster,
assignment.getResource(), Resources.none())) {
// Remove and re-insert to sort
iter.remove();
LOG.info("Re-sorting assigned queue: " + childQueue.getQueuePath() +
" stats: " + childQueue);
childQueues.add(childQueue);
if (LOG.isDebugEnabled()) {
printChildQueues();
}
break;
}
}
return assignment;
}
assignContainersToChildQueues呼叫ChildQueue的assignContainer進行分配,分配之後要講改childQueue從佇列中remove掉,然後重新插入到佇列中,進行排序。
childQueue.assignContainers如下:
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve) {
...
// if our queue cannot access this node, just return
if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
labelManager.getLabelsOnNode(node.getNodeID()))) {
return NULL_ASSIGNMENT;
}
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
return assignReservedContainer(application, node, reservedContainer,
clusterResource);
}
}
// Try to assign containers to applications in order
for (FiCaSchedulerApp application : activeApplications) {
...
// 加鎖
synchronized (application) {
// Check if this resource is on the blacklist
if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
continue;
}
// Schedule in priority order
for (Priority priority : application.getPriorities()) {
// 為什麼是ANY?
// 如果當前application中的request中沒有ANY就不分配?
// 想辦法debug試一下
ResourceRequest anyRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (null == anyRequest) {
continue;
}
// Required resource
Resource required = anyRequest.getCapability();
// Do we need containers at this 'priority'?
if (application.getTotalRequiredResources(priority) <= 0) {
continue;
}
if (!this.reservationsContinueLooking) {
if (!needContainers(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
continue;
}
}
Set<String> requestedNodeLabels =
getRequestLabelSetByExpression(anyRequest
.getNodeLabelExpression());
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
// priority request as the target.
// This works since we never assign lower priority requests
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
required, requestedNodeLabels);
// Check queue max-capacity limit
if (!canAssignToThisQueue(clusterResource, required,
labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
application, true, requestedNodeLabels)) {
break;
}
// Inform the application it is about to get a scheduling opportunity
// 這又是什麼鬼?增加排程的機會?
application.addSchedulingOpportunity(priority);
// Try to schedule
// 開始排程
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
null, needToUnreserve);
// Did the application skip this node?
if (assignment.getSkipped()) {
// Don't count 'skipped nodes' as a scheduling opportunity!
application.subtractSchedulingOpportunity(priority);
continue;
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
if (Resources.greaterThan(
resourceCalculator, clusterResource, assigned, Resources.none())) {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
labelManager.getLabelsOnNode(node.getNodeID()));
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
if (LOG.isDebugEnabled()) {
LOG.debug("Resetting scheduling opportunities");
}
application.resetSchedulingOpportunities(priority);
}
// Done
return assignment;
} else {
// Do not assign out of order w.r.t priorities
break;
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
+ application.getApplicationId());
}
application.showRequests();
}
return NULL_ASSIGNMENT;
}
LeafQueue.assignContainers會從遍歷當前佇列中正在執行的application的container請求,通過一系列的邏輯之後呼叫assignContainersOnNode
進行排程
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
// 如果ResourceName是NODE_LOCAL
ResourceRequest nodeLocalResourceRequest =
application.getResourceRequest(priority, node.getNodeName());
if (nodeLocalResourceRequest != null) {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.NODE_LOCAL);
}
}
// 如果ResourceName是Rack-local
ResourceRequest rackLocalResourceRequest =
application.getResourceRequest(priority, node.getRackName());
if (rackLocalResourceRequest != null) {
if (!rackLocalResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer, needToUnreserve);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
return new CSAssignment(assigned, NodeType.RACK_LOCAL);
}
}
// 如果ResourceName是Off-switch,也就是ANY
ResourceRequest offSwitchResourceRequest =
application.getResourceRequest(priority, ResourceRequest.ANY);
if (offSwitchResourceRequest != null) {
if (!offSwitchResourceRequest.getRelaxLocality()) {
return SKIP_ASSIGNMENT;
}
return new CSAssignment(
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer, needToUnreserve),
NodeType.OFF_SWITCH);
}
return SKIP_ASSIGNMENT;
}
assignContainersOnNode會根據請求中資源的型別進行不同的邏輯處理,由於am container中ResourceRequest為ANY,所以這裡只關注下Off-switch的處理邏輯,程式碼如下:
private Resource assignOffSwitchContainers(
Resource clusterResource, ResourceRequest offSwitchResourceRequest,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, boolean needToUnreserve) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
needToUnreserve);
}
return Resources.none();
}
assignOffSwitchContainers又呼叫了assignContainer,繼續跟蹤
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
boolean needToUnreserve) {
...
// container的資源大小
Resource capability = request.getCapability();
// 節點可用的資源大小
Resource available = node.getAvailableResource();
// 節點總共資源大小
Resource totalResource = node.getTotalResource();
// 判斷請求的資源是否超過了節點的總量
if (!Resources.fitsIn(capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());
// Create the container if necessary
// 生成containerId
Container container =
getContainer(rmContainer, application, node, capability, priority);
...
// 先判斷是否可以分配預留的container,
// 可以分配正常的container時,才去判斷空閒的資源是否可以分配
// Can we allocate a container on this node?
int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability);
if (availableContainers > 0) {
// Allocate...
...
// Inform the application
RMContainer allocatedContainer =
application.allocate(type, node, priority, request, container);
// Does the application need this resource?
if (allocatedContainer == null) {
return Resources.none();
}
// 通知node進行分配,將container放入launchedContainers map中
// Inform the node
node.allocateContainer(allocatedContainer);
LOG.info("assignedContainer" +
" application attempt=" + application.getApplicationAttemptId() +
" container=" + container +
" queue=" + this +
" clusterResource=" + clusterResource);
return container.getResource();
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
...
return Resources.none();
}
}
assignContainer首先判斷container請求的資源是否超過了節點的總資源量,如果沒有超過呼叫getContainer
檢視當前節點上是否有預留的container,沒有則createContainer
,生成containerId。containerId生成之後,去判斷當前節點上的空閒資源能否夠分配,如果可以的話就呼叫application.allocate
進行分配,application是FiCaSchedulerApp的物件。最後將container放入launchedContainers
中,隨後會心跳返回給node。allocate程式碼如下:
synchronized public RMContainer allocate(NodeType type, FiCaSchedulerNode node,
Priority priority, ResourceRequest request,
Container container) {
...
// container在RM端稱為RMcontainer
// Create RMContainer
RMContainer rmContainer = new RMContainerImpl(container, this
.getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), this.rmContext);
// Add it to allContainers list.
// 將生成的container放入allContainers list
// 排程器在排程的時候從中取出container
newlyAllocatedContainers.add(rmContainer);
liveContainers.put(container.getId(), rmContainer);
// Update consumption and track allocations
List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
type, node, priority, request, container);
Resources.addTo(currentConsumption, container.getResource());
// Update resource requests related to "request" and store in RMContainer
((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
// Inform the container
// 時間排程器來通知container已經準備好,觸發container狀態機
rmContainer.handle(
new RMContainerEvent(container.getId(), RMContainerEventType.START));
...
return rmContainer;
}
allocate建立一個RMContainer,並將其放入allContainers列表newlyAllocatedContainers
中,排程器從中取出container分配給node。
總結
大致的流程順著程式碼理解的差不多了,但一些細節還是沒有搞太清楚,隨後詳細debug下,在更新吧。
說下我目前的理解,排程器首先建立一個container請求,並檢視newlyAllocatedContainers
中是否有可排程的container,如果有則建立該container的TOKEN。然後nm來進行心跳的時候,從requests中取出對應的請求進行例項化,隨後再放入newlyAllocatedContainers
列表中,等待排程。