A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

AM注册到RM

1. 从Yarn源码剖析(三)-- ApplicationMaster的启动可知提交应用程序至yarn时最后启动了ApplicationMaster类,所以我们直接来看这个类(是spark自己封装的AM)的main方法,可以看到spark是通过调用AMRMClient客户端来调用相关API来实现AM注册的,以及资源的调度。



  • amClient = AMRMClient.createAMRMClient()  //创建一个客户端实例



  • amClient.init(conf)  //初始化



  • amClient.start()  //启动客户端



  • this.uiHistoryAddress = uiHistoryAddress







  • val trackingUrl = uiAddress.getOrElse {



  •   if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""



  • }







  • logInfo("Registering the ApplicationMaster")



  • synchronized {



  •   amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)  //注册AM



  •   registered = true



  • }


2. 那我们就对应这几个API到hadoop源码中去剖析AM注册启动的过程,先从AMRMClient.createAMRMClient()入手,我们找到hadoop代码中org.apache.hadoop.yarn.client.api.AMRMClient这个类(hadoop是2.7.4版本的代码),可以看到就是new了一个实现类AMRMClientImpl



  • public static <T extends ContainerRequest> AMRMClient<T> createAMRMClient() {



  •   AMRMClient<T> client = new AMRMClientImpl<T>();



  •   return client;



  • }


3. 接下来看一下amClient.init(conf)的初始化方法,那我们这里显然是由 AMRMClientImpl()实现,因此我们去看它的serviceInit(),内部就是把配置文件做了一个初始化,就不多做分析了。接着看amClient.start()方法,它同init()方法一样,相关的初始化和启动方法在Yarn源码剖析(一) --- RM与NM服务启动以及心跳通信有做介绍,显然也是由AMRMClientImpl()实现的,内部就实现了为指定的协议创建资源管理器的代理。至此AM服务的初始化、启动就完成了,接下来就是注册这个AM到RM上了,从spark端的代码中可以看到,是调用了AMRMClient.registerApplicationMaster()方法。



  • synchronized {



  •   amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)



  •   registered = true



  • }


4. 那我们就来看一下这个代码AMRMClient.registerApplicationMaster(),由它的实现类AMRMClientImpl()实现。



  •   private RegisterApplicationMasterResponse registerApplicationMaster()



  •       throws YarnException, IOException {



  •       //设置资源请求



  •     RegisterApplicationMasterRequest request =



  •         RegisterApplicationMasterRequest.newInstance(this.appHostName,



  •             this.appHostPort, this.appTrackingUrl);



  •             //获取响应



  •     RegisterApplicationMasterResponse response =



  •         rmClient.registerApplicationMaster(request);  //关键方法



  •     synchronized (this) {



  •       lastResponseId = 0;



  •       if (!response.getNMTokensFromPreviousAttempts().isEmpty()) {



  •         populateNMTokens(response.getNMTokensFromPreviousAttempts());



  •       }



  •     }



  •     return response;



  •   }


5. 很明显这里有一个非常关键的方法rmClient.registerApplicationMaster(request),点进去发现这个方法由AM的服务端代码来实现,这内部就是真正做注册的事情了。下面这段代码就是内部真正做注册的代码,这块代码涉及到yarn状态机的转换,通过提交不同的事件来做相应的处理,这块内容单独拿出来讲完全可以形成一篇长文,所以有兴趣的朋友可以自己去阅读源码或者查阅资料。



  • LOG.info("AM registration " + applicationAttemptId);



  • this.rmContext



  •   .getDispatcher()



  •   .getEventHandler()



  •   .handle(



  •     new RMAppAttemptRegistrationEvent(applicationAttemptId, request



  •       .getHost(), request.getRpcPort(), request.getTrackingUrl()));


6. 根据状事件状态的提交,AM注册会交由到RMAppAttemptImpl中的AMRegisteredTransition.transition类来实现,该方法实现了状态机的转换由ACCEPTED状态变换RUNNING,至此AM注册成功。



  •   // Let the app know



  •   appAttempt.eventHandler.handle(new RMAppEvent(appAttempt



  •       .getAppAttemptId().getApplicationId(),



  •      RMAppEventType.ATTEMPT_REGISTERED));


AM申请Container

1. 从下文代码可以看出,调用了AMRMClient的allocate方法来实现资源调度,所以我们去看hadoop代码中AM是如何做资源调度的。



  •   //内部是对封装请求信息



  •   updateResourceRequests()



  •   ​



  •   val progressIndicator = 0.1f



  •   // requests.



  •   //此方法就是去yarn查看所有节点可用的资源信息



  •   val allocateResponse = amClient.allocate(progressIndicator)



  •   ​



  •   val allocatedContainers = allocateResponse.getAllocatedContainers()



2. 先看一下updateResourceRequests()内部,调用了以下这个方法,这个方法就是将请求传给RM,这个方法我就不做具体分析了。



  •   newLocalityRequests.foreach { request =>



  •     amClient.addContainerRequest(request)



  •   }


3. 接下来就看关键方法amClient.allocate(progressIndicator),当然该方法也是有AMRMClientImpl实现类实现的,并且在该实现类初始化时,实例化了ApplicationMasterProtocol,并调用该对象allocate,由ApplicationMasterService实现,这个代码比较的长,所以我们分段来看关键的代码,先来看这段代码,这里先去执行了STATUS_UPDATE事件,更新保存一下各个节点的资源信息(因为在之前一系列的操作中,集群的资源可能已经发生变化了)



  •   // Send the status update to the appAttempt.



  •   this.rmContext.getDispatcher().getEventHandler().handle(



  •       new RMAppAttemptStatusupdateEvent(appAttemptId, request



  •           .getProgress()));


4. 接下来则是去检查一下调度器队列中的cpu和内存是否足够



  •   //去检查队列中的cpu和内存是否足够



  •   RMServerUtils.normalizeAndValidateRequests(ask,



  •       rScheduler.getMaximumResourceCapability(), app.getQueue(),



  •       rScheduler, rmContext);


5. 下面就是实现了调度器的资源调度,我们默认分析Capatity Scheduler



  •   // Send new requests to appAttempt.



  •   Allocation allocation =



  •       this.rScheduler.allocate(appAttemptId, ask, release,



  •           blacklistAdditions, blacklistRemovals);


6. 对于请求,我们会做一个资源规整化,在各类调度器中有一个规整化因子,capatity以及FIFO调度器都是不可配的,由yarn的最小可调度资源来决定,Fair调度器则可以配置,什么叫规整化因子,其实很好理解,假设规整化因子是1G,如果此时申请的资源是800M,那么yarn也会调度1个G内存的Container供任务使用



  •   // Sanity check



  •   //规整化资源请求



  •   SchedulerUtils.normalizeRequests(



  •       ask, getResourceCalculator(), getClusterResource(),



  •       getMinimumResourceCapability(), getMaximumResourceCapability());


7. 我们继续看最后返回了getAllocation方法,内部有一个重要的方法pullNewlyAllocatedContainersAndNMTokens(),所有的调度器最终都会来执行这个方法,这个方法内部则是做对容器鉴权以及得到申请下的Container并返回,该方法内部有一个newlyAllocatedContainers集合,代码看到这就会有一定的疑问,这个Container的集合是怎么获得的,为什么在这里就拿到了被分配的Container集合呢?

8. 这个就要从NM与RM的心跳说起了,当AM启动注册到RM时,AM就发送了请求给RM,RM会与NM通信去申请资源,NM则通过心跳的方式去给出Container的集合。Yarn源码剖析(一) --- RM与NM服务启动以及心跳通信中我们就介绍NM与RM的心跳通信,所以此时我们就直接去看心跳触发的STATUS_UPDATE事件的转换函数 StatusUpdateWhenHealthyTransition(),内部有这么一个方法,这个方法会触发SchedulerEventType.NODE_UPDATE事件



  •   if(rmNode.nextHeartBeat) {



  •     rmNode.nextHeartBeat = false;



  •     rmNode.context.getDispatcher().getEventHandler().handle(



  •         new NodeUpdateSchedulerEvent(rmNode));



  •   }


9. 该事件交由默认调度器处理,跳转到allocateContainersToNode(),我们分析没有Container预定的情况



  •     root.assignContainers(



  •         clusterResource,



  •         node,



  •         // TODO, now we only consider limits for parent for non-labeled



  •         // resources, should consider labeled resources as well.



  •         new ResourceLimits(labelManager.getResourceByLabel(



  •             RMNodeLabelsManager.NO_LABEL, clusterResource)));



  •   }


10. 方法先进入根队列类ParnetQueue去处理,来看一下代码分析,只选了关键部分的代码



  •   while (canAssign(clusterResource, node)) {  //判断节点资源是否足够



  •     if (LOG.isDebugEnabled()) {



  •       LOG.debug("Trying to assign containers to child-queue of "



  •         + getQueueName());



  •     }







  •     //判断队列是否接受该节点的注册,节点可用资源判断之类的



  •     if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,



  •         minimumAllocation, Resources.createResource(getMetrics()



  •             .getReservedMB(), getMetrics().getReservedVirtualCores()))) {



  •       break;



  •     }







  •     // 进入子队列实现调度



  •     CSAssignment assignedToChild =



  •         assignContainersToChildQueues(clusterResource, node, resourceLimits);



  •     assignment.setType(assignedToChild.getType());



  •     }



  •   ​


11. 子队列中会做完队列资源判断后,就会进入子队列的assignContainers方法,这个方法很长,其实大多还是一些校验性的方法,不多做分析了,内部会有序的为应用程序申请Container,最后进入下面这个方法



  •   // Try to schedule



  •   CSAssignment assignment =  



  •     assignContainersOnNode(clusterResource, node, application, priority,



  •         null, currentResourceLimits);


12. 可以看一下这个方法内部做了什么,内部分为三种调度情况NodeType.NODE_LOCAL、NodeType.RACK_LOCAL、NodeType.OFF_SWITCH,我们分析NodeType.NODE_LOCAL,所以进入assignNodeLocalContainers方法,该方法也较长,不考虑预定Container的情况,直接找关键方法,



  •    // 前面都做过资源判断了,所以此处根据资源请求直接new出一个Container对象



  •   Container container = getContainer(rmContainer, application, node, capability, priority);



  •   // 添加分配Container



  •   RMContainer allocatedContainer =



  •       application.allocate(type, node, priority, request, container);



  •   ​



  •   // Does the application need this resource?



  •   if (allocatedContainer == null) {



  •     return Resources.none();



  •   }



  •   ​



  •   // Inform the node



  •   node.allocateContainer(allocatedContainer); //告知节点需要添加要启动的Container集合


13. 那我们来看看关键的添加分配的方法application.allocate(),将new出来的Container传入了新的RMContainer,而且添加到newlyAllocatedContainers集合中,看到此处,大家就知道前文中的集合是怎么来的了,而后告诉状态机Container的状态转换到了STARTED



  •   // Create RMContainer



  •   RMContainer rmContainer = new RMContainerImpl(container, this



  •       .getApplicationAttemptId(), node.getNodeID(),



  •       appSchedulingInfo.getUser(), this.rmContext);



  •   ​



  •   // Add it to allContainers list.



  •   newlyAllocatedContainers.add(rmContainer);


14. 步骤11中还有一个node.allocateContainer(allocatedContainer),我们来看一下,可以看出,将这些返回的Container添加到了一个launchedContainers集合中,这个集合用于Container的启动。



  •   public synchronized void allocateContainer(RMContainer rmContainer) {



  •     Container container = rmContainer.getContainer();



  •     deductAvailableResource(container.getResource());



  •     ++numContainers;



  •   ​



  •     launchedContainers.put(container.getId(), rmContainer);



  •   }


Container的启动

1. 从上文可知spark调用的Yarn的接口去获取到了匹配的Container集合,那接下来当然是要去启动这些Container了,所以,来看看handleAllocatedContainers(allocatedContainers.asScala)



  •   val allocateResponse = amClient.allocate(progressIndicator)



  •   ​



  •   val allocatedContainers = allocateResponse.getAllocatedContainers()



  •   handleAllocatedContainers(allocatedContainers.asScala)


  ​2. 这个方法内部有一个runAllocatedContainers(containersToUse),下面我贴了这个方法关键的一个部分,将各个需要的变量传入这个线程



  •   launcherPool.execute(new Runnable {



  •     override def run(): Unit = {



  •       try {



  •         new ExecutorRunnable(



  •           Some(container),



  •           conf,



  •           sparkConf,



  •           driverUrl,



  •           executorId,



  •           executorHostname,



  •           executorMemory,



  •           executorCores,



  •           appAttemptId.getApplicationId.toString,



  •           securityMgr,



  •           localResources



  •         ).run()



  •         updateInternalState()


3. 那我们来看这个线程的run方法,初始化了启动一个NMClient,然后调用了startContainer()



  •   def run(): Unit = {



  •     logDebug("Starting Executor Container")



  •     nmClient = NMClient.createNMClient()



  •     nmClient.init(conf)



  •     nmClient.start()



  •     startContainer()



  •   }


4. 那来看一下 startContainer(),选取了最关键的方法



  •   // Send the start request to the ContainerManager



  •   try {



  •     nmClient.startContainer(container.get, ctx)



  •   } catch {



  •     case ex: Exception =>



  •       throw new SparkException(s"Exception while starting container ${container.get.getId}" +



  •         s" on host $hostname", ex)



  •   }


5. 调用了NM启动Container的接口,接口后面的内容与上一篇AM Container启动的过程基本一致,这里就不做分析了

总结

至此,围绕着Yarn源码剖析(零) --- spark任务提交到yarn的流程流程图有关spark任务的提交至运行就结束了。关于Yarn的资源调度介绍到,也就结束了,在剖析源码的过程中,蛋挞还是有很多很多的困惑,例如状态机的转换,Hadoop RPC通信等等,这些困惑的知识点,也会在后续的学习中去输出对应的博文,也希望大家在阅读蛋挞的博文时可以指出文中错误的地方~在这蛋挞不胜感激。

在整个Yarn资源调度的过程中,蛋挞参考了许多博文资料,所以在剖析的思路上难免会受到一些引导,若有侵犯博主的权益,请知会与我删除相关的信息。


1 个回复

正序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马