A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

前言

在上文Yarn源码剖析(二) --- spark-submit,我们介绍了spark任务通过spark-submit提交任务至yarn申请资源至启动的全流程,本篇将介绍启动过程中ApplicationMaster(后文简称AM)是如何启动。

AM的启动与Container的申请

1. 在Yarn源码剖析(二)中yarnClient最终调用submitApplication方法提交任务,传入的参数带有AM启动的上下文,因此AM的启动就是在yarn这个方法中实现的



  • val containerContext = createContainerLaunchContext(newAppResponse) //封装AM启动的上下文



  • val appContext = createApplicationSubmissionContext(newApp, containerContext) //App的上下文







  • yarnClient.submitApplication(appContext) //提交任务


2. AM的启动异常的复杂,篇幅巨大,下面我会摘选重要的部分做分析,spark在此处封装好AM运行的上下文后,最终在yarn的事件处理机制会运行这些上下文,回调到spark中的AM类,client模式和cluster模式运行的类是不一样的,具体的运行类如下所示:



  • val amClass =



  • if (isClusterMode) { //集群cluster模式



  •     Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName



  • } else { //client模式



  •     Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName



  • }


3. 那显而易见的,我们应该从yarnClient.submitApplication(appContext)去分析hadoop端的代码,分析yarn是如何来通过这个上下文来启动spark自己封装的这个AM的,显而易见这个接口由YarnClientImpl实现,在该方法内部又调用了ApplicationClientProtocol.submitApplication,这个类是yarn利用rpc相互通信的关键类,这里也不多做介绍了,我们看到提交任务后会启动一个死循环,等待任务提交完成。



  • //request是包含了我们服务整体参数以及脚本的对象,提交至RM



  • rmClient.submitApplication(request);







  • int pollCount = 0;



  • long startTime = System.currentTimeMillis();



  • EnumSet<YarnApplicationState> waitingStates =



  • EnumSet.of(YarnApplicationState.NEW,



  • YarnApplicationState.NEW_SAVING,



  • YarnApplicationState.SUBMITTED);



  • EnumSet<YarnApplicationState> failToSubmitStates =



  • EnumSet.of(YarnApplicationState.FAILED,



  • YarnApplicationState.KILLED);



  • while (true) {



  •      try {



  •          ApplicationReport appReport = getApplicationReport(applicationId);



  •          YarnApplicationState state = appReport.getYarnApplicationState();



  •          if (!waitingStates.contains(state)) {



  •              if(failToSubmitStates.contains(state)) {



  •                  throw new YarnException("Failed to submit " + applicationId +



  •                  " to YARN : " + appReport.getDiagnostics());



  •              }



  •          LOG.info("Submitted application " + applicationId);



  •          break;



  •      }






4. 这个submitApplication是由ClientRMService来实现的,我把整段方法都贴进来了,所以我把分析内容放到了代码的注释中



  • //为了保证安全性ApplicationSubmissionContext在这里会被验证,哪些独立于RM



  • //字段在此处验证,而依赖于RM发的则在RMAppManager被验证



  • String user = null;



  • try {



  •      user = UserGroupInformation.getCurrentUser().getShortUserName();



  • } catch (IOException ie) {



  •     LOG.warn("Unable to get the current user.", ie);



  •     RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,



  •     ie.getMessage(), "ClientRMService",



  •     "Exception in submitting application", applicationId);



  •     throw RPCUtil.getRemoteException(ie);



  • }







  • //确认app是否被放在了rmContext中,如果是则响应



  • if (rmContext.getRMApps().get(applicationId) != null) {



  • LOG.info("This is an earlier submitted application: " + applicationId);



  • return SubmitApplicationResponse.newInstance();



  • }







  • //判断任务队列



  • if (submissionContext.getQueue() == null) {



  • submissionContext.setQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);



  • }



  • //判断是否是无效的任务名称



  • if (submissionContext.getApplicationName() == null) {



  • submissionContext.setApplicationName(



  • YarnConfiguration.DEFAULT_APPLICATION_NAME);



  • }



  • //任务类型判断



  • if (submissionContext.getApplicationType() == null) {



  • submissionContext



  • .setApplicationType(YarnConfiguration.DEFAULT_APPLICATION_TYPE);



  • } else {



  •      if (submissionContext.getApplicationType().length() >                 



  •      YarnConfiguration.APPLICATION_TYPE_LENGTH) {



  •      submissionContext.setApplicationType(submissionContext



  •      .getApplicationType().substring(0,



  •      YarnConfiguration.APPLICATION_TYPE_LENGTH));



  •      }



  • }







  • try {



  • // call RMAppManager to submit application directly



  • //让RMAppManager立即提交应用



  • //关于ApplicationManager大家可以参考我基础组件分析的那一章节



  • rmAppManager.submitApplication(submissionContext,



  • System.currentTimeMillis(), user);







  • LOG.info("Application with id " + applicationId.getId() +



  • " submitted by user " + user);



  • RMAuditLogger.logSuccess(user, AuditConstants.SUBMIT_APP_REQUEST,



  • "ClientRMService", applicationId);



  • } catch (YarnException e) {



  • LOG.info("Exception in submitting application with id " +



  • applicationId.getId(), e);



  • RMAuditLogger.logFailure(user, AuditConstants.SUBMIT_APP_REQUEST,



  • e.getMessage(), "ClientRMService",



  • "Exception in submitting application", applicationId);



  • throw e;



  • }







  • SubmitApplicationResponse response = recordFactory



  • .newRecordInstance(SubmitApplicationResponse.class);



  • return response;



  • }


5. 从上列的代码中可知,我们应该进到rmAppManager.submitApplication()去分析,该方法内部有一个createAndPopulateNewRMApp(),我们来看一下



  • private RMAppImpl createAndPopulateNewRMApp(



  • ApplicationSubmissionContext submissionContext, long submitTime,



  • String user, boolean isRecovery) throws YarnException {



  • ApplicationId applicationId = submissionContext.getApplicationId();







  • //检查AM的请求,对资源做检查



  • ResourceRequest amReq =



  • validateAndCreateResourceRequest(submissionContext, isRecovery);







  • // Create RMApp



  • //此处封装了一个状态机,这是yarn的一个重大机制,每个服务随着状态不断改变而做出操作



  • RMAppImpl application =



  • new RMAppImpl(applicationId, rmContext, this.conf,



  • submissionContext.getApplicationName(), user,



  • submissionContext.getQueue(),



  • submissionContext, this.scheduler, this.masterService,



  • submitTime, submissionContext.getApplicationType(),



  • submissionContext.getApplicationTags(), amReq);







  • return application;



  • }



6. 最后提交了一个START事件,上文我们可以知道new了一个RMAppImpl,这里就是触发它的状态机对应事件,这段代码的意思是处理START事件,任务状态从NEW转换到NEW_SAVING并,触发了RMAppNewlySavingTransition转换



  • .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,



  • RMAppEventType.START, new RMAppNewlySavingTransition())


7. 那很明显,我们要看RMAppNewlySavingTransition(),点进去看内部的代码很简单,代码



  • private static final class RMAppNewlySavingTransition extends RMAppTransition {



  • @Override



  • public void transition(RMAppImpl app, RMAppEvent event) {



  • ///如果恢复配置被启用,那么将应用程序信息存储在非阻塞调用中,



  • // 因此要确保RM已经存储了在RM重新启动后能够重启AM所需的信息,而无需再次与客户端通信







  • LOG.info("Storing application with id " + app.applicationId);



  • app.rmContext.getStateStore().storeNewApplication(app);



  • }



  • }


8. 那我们来看这个存储App信息的方法做了些什么,触发了一个STORE_APP事件,由StoreAppTransition处理



  • public void storeNewApplication(RMApp app) {



  • ApplicationSubmissionContext context = app.getApplicationSubmissionContext();



  • assert context instanceof ApplicationSubmissionContextPBImpl;



  • ApplicationStateData appState =



  • ApplicationStateData.newInstance(



  • app.getSubmitTime(), app.getStartTime(), context, app.getUser());



  • dispatcher.getEventHandler().handle(new RMStateStoreAppEvent(appState));



  • }


9. 可以看到内部有一段代码,通知rm提交APP_NEW_SAVED事件,这个事件由AddApplicationToSchedulerTransition
处理



  • try {



  • store.storeApplicationStateInternal(appId, appState);



  • store.notifyApplication(new RMAppEvent(appId,



  • RMAppEventType.APP_NEW_SAVED));



  • } catch (Exception e) {



  • LOG.error("Error storing app: " + appId, e);



  • isFenced = store.notifyStoreOperationFailedInternal(e);



  • }


10. 这个事件很简单从表面意思也能读懂,就是将应用程序交个调度器去处理,所以他提交了一个APP_ADDED事件,我们分析默认调度器Capatity Scheduler,所以此时就去看Capatity中的代码



  • case APP_ADDED:



  • {



  • AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;



  • String queueName =



  • resolveReservationQueueName(appAddedEvent.getQueue(),



  • appAddedEvent.getApplicationId(),



  • appAddedEvent.getReservationID());



  • if (queueName != null) {



  • if (!appAddedEvent.getIsAppRecovering()) {



  • //这里是告知队列有任务提交了,队列会统计任务数量



  • addApplication(appAddedEvent.getApplicationId(), queueName,



  • appAddedEvent.getUser());



  • } else {



  • addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName,



  • appAddedEvent.getUser());



  • }



  • }



  • // 提交了APP_ACCEPTED事件



  • queue.getMetrics().submitApp(user);



  • SchedulerApplication<FiCaSchedulerApp> application =



  • new SchedulerApplication<FiCaSchedulerApp>(queue, user);



  • applications.put(applicationId, application);



  • LOG.info("Accepted application " + applicationId + " from user: " + user



  • + ", in queue: " + queueName);



  • rmContext.getDispatcher().getEventHandler()



  • .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));



  • }


11. 从上面的代码可以看出,Capatity调度器提交了事件APP_ACCEPTED,状态从SUBMITTED转成了ACCEPTED并触发



  • StartAppAttemptTransition()



  • .addTransition(RMAppState.SUBMITTED, RMAppState.ACCEPTED,



  • RMAppEventType.APP_ACCEPTED, new StartAppAttemptTransition())


12. 这个类在内部创建了一个新的RMAppAttempt,然后提交了事件RMAppAttemptEventType.START,触发了AttemptStartedTransition(),很明显这个类对象使我们刚new出来的,那匹配的状态机的初始状态就是NEW,现在由于提交了START事件,状态变为了SUBMITTED



  • // Transitions from NEW State



  • .addTransition(RMAppAttemptState.NEW, RMAppAttemptState.SUBMITTED,



  • RMAppAttemptEventType.START, new AttemptStartedTransition())


13. 那我们来看看这个方法AttemptStartedTransition(),其实我们要看一下registerAppAttempt方法



  • //注册AM的service



  • appAttempt.masterService



  • .registerAppAttempt(appAttempt.applicationAttemptId);







  • // Add the applicationAttempt to the scheduler and inform the scheduler



  • // whether to transfer the state from previous attempt.



  • appAttempt.eventHandler.handle(new AppAttemptAddedSchedulerEvent(



  • appAttempt.applicationAttemptId, transferStateFromPreviousAttempt));


14. 我们看到把response设置成了-1,这个Id会在AM后面每次的通信中自增,会借助这个id来判断请求是重复请求还是新的请求,还是旧的请求。



  • AllocateResponse response =



  • recordFactory.newRecordInstance(AllocateResponse.class);



  • // set response id to -1 before application master for the following



  • // attemptID get registered



  • response.setResponseId(-1);



  • LOG.info("Registering app attempt : " + attemptId);



  • responseMap.put(attemptId, new AllocateResponseLock(response));



  • rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);


15. 回到AttemptStartedTransition()方法中,最后它提交了一个事件SchedulerEventType.APP_ATTEMPT_ADDED,这个事件交回给Capatity调度器去处理



  • case APP_ATTEMPT_ADDED:



  • {



  • AppAttemptAddedSchedulerEvent appAttemptAddedEvent =



  • (AppAttemptAddedSchedulerEvent) event;



  • addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),



  • appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),



  • appAttemptAddedEvent.getIsAttemptRecovering());



  • }


16. 那我们自然是进入addApplicationAttempt方法去分析,内部我选了部分代码做分析,下面这段new了一个FiCaSchedulerApp,在内部设置了AM启动资源信息



  • FiCaSchedulerApp attempt =



  • new FiCaSchedulerApp(applicationAttemptId, application.getUser(),



  • queue, queue.getActiveUsersManager(), rmContext);


17. 设置完后提交了RMAppAttemptEventType.ATTEMPT_ADDED事件



  • rmContext.getDispatcher().getEventHandler().handle(



  • new RMAppAttemptEvent(applicationAttemptId,



  • RMAppAttemptEventType.ATTEMPT_ADDED));


这里的意思是提交了ATTEMPT_ADDED事件使得状态从SUBMITTED转变,转变的结果可能有LAUNCHED_UNMANAGED_SAVING或者SCHEDULED,而后状态机会根据返回的不同状态信息再做处理



  • .addTransition(RMAppAttemptState.SUBMITTED,



  • EnumSet.of(RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING,



  • RMAppAttemptState.SCHEDULED),



  • RMAppAttemptEventType.ATTEMPT_ADDED,



  • new ScheduleTransition())


18. 我们接着分析ScheduleTransition(),if入口的开关subCtx.getUnmanagedAM()是获取RM是否应该管理AM的执行。如果为真,那么RM将不会为AM分配一个容器并启动它,默认是false。那很明显我们这里要返回的状态是SCHEDULED



  • ApplicationSubmissionContext subCtx = appAttempt.submissionContext;



  • //获取RM是否应该管理AM的执行。如果为真,那么RM将不会为AM分配一个容器并启动它,默认是false



  • if (!subCtx.getUnmanagedAM()) {



  • //在创建新的尝试之前需要重置容器,因为这个请求将被传递给调度器,调度器将在AM容器分配后扣除这个数字



  • appAttempt.amReq.setNumContainers(1);



  • appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);



  • /* 表示为任一机器 */



  • appAttempt.amReq.setResourceName(ResourceRequest.ANY);



  • appAttempt.amReq.setRelaxLocality(true);







  • //调度器分配资源



  • Allocation amContainerAllocation =



  • appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,



  • Collections.singletonList(appAttempt.amReq),



  • EMPTY_CONTAINER_RELEASE_LIST, null, null);



  • if (amContainerAllocation != null



  • && amContainerAllocation.getContainers() != null) {



  • assert (amContainerAllocation.getContainers().size() == 0);



  • }



  • return RMAppAttemptState.SCHEDULED;



  • } else {



  • // save state and then go to LAUNCHED state



  • appAttempt.storeAttempt();



  • return RMAppAttemptState.LAUNCHED_UNMANAGED_SAVING;



  • }


19. 上面的代码中应该能看到一行令人振奋的代码appAttempt.scheduler.allocate(),这里做的是资源的调度,我们这不做详细的分析,在后文AM申请资源时也会调用这个接口申请剩下的Container,后文会有详细的介绍,我们刚刚知道了上文返回了SCHEDULED状态,之前添加转换的方法是会根据返回的状态形成新的转换,这个时候就会调用到下面这个转换,触发了AMContainerAllocatedTransition()



  • .addTransition(RMAppAttemptState.SCHEDULED,



  • EnumSet.of(RMAppAttemptState.ALLOCATED_SAVING,



  • RMAppAttemptState.SCHEDULED),



  • RMAppAttemptEventType.CONTAINER_ALLOCATED,



  • new AMContainerAllocatedTransition())


20. 具体的分析见代码块,发现在这里也调用了allocate,但是传入没有传入请求,在allocate方法中做了判断的,如果传入的空的请求就是去尝试获取之前申请过的容器,而不是再做一次资源调度



  • // Acquire the AM container from the scheduler.



  • //从调度器获取AM容器



  • Allocation amContainerAllocation =



  • appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,



  • EMPTY_CONTAINER_REQUEST_LIST, EMPTY_CONTAINER_RELEASE_LIST, null,



  • null);



  • //至少分配一个容器,因为一个container_allocation是在构建一个RMContainer之后发出的,



  • // 并将其放入到requerapplication # newallocatedcontainers中。



  • // 注意,YarnScheduler#分配不能保证能够获取它,



  • // 因为由于某些原因(如DNS不可用导致未生成容器令牌)容器可能无法获取。



  • // 因此,我们返回到以前的状态并继续重试,直到获取am容器。



  • if (amContainerAllocation.getContainers().size() == 0) {



  • appAttempt.retryFetchingAMContainer(appAttempt);



  • return RMAppAttemptState.SCHEDULED;



  • }







  • // Set the masterContainer



  • appAttempt.setMasterContainer(amContainerAllocation.getContainers()



  • .get(0));



  • RMContainerImpl rmMasterContainer = (RMContainerImpl)appAttempt.scheduler



  • .getRMContainer(appAttempt.getMasterContainer().getId());



  • rmMasterContainer.setAMContainer(true);



  • // NMTokenSecrentManager中的节点集用于标记该节点是否已向AM发出NMToken。



  • // 当AM容器分配给RM本身时,分配这个AM容器的节点被标记为已经发送的NMToken。



  • // 因此,清除这个节点集,以便以下来自AM的分配请求能够检索相应的NMToken。



  • appAttempt.rmContext.getNMTokenSecretManager()



  • .clearNodeSetForAttempt(appAttempt.applicationAttemptId);



  • appAttempt.getSubmissionContext().setResource(



  • appAttempt.getMasterContainer().getResource());



  • appAttempt.storeAttempt();







  • return RMAppAttemptState.ALLOCATED_SAVING;


21. 我们看到最终返回了ALLOCATED_SAVING,与之前一样根据返回的状态触发另一个事件



  • .addTransition(RMAppAttemptState.ALLOCATED_SAVING,



  • RMAppAttemptState.ALLOCATED,



  • RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())



  • 这个事件终于看到了启动的方法,launchAttempt()这个方法内部提交了一个LAUNCH事件



  • private static final class AttemptStoredTransition extends BaseTransition {



  • @Override



  • public void transition(RMAppAttemptImpl appAttempt,



  • RMAppAttemptEvent event) {







  • appAttempt.registerClientToken();



  • appAttempt.launchAttempt();



  • }



  • }


22. 走到这,我们终于发现了令人振奋的类ApplicationMasterLauncher,刚刚提交了LAUNCH事件,自然走launch()方法



  • AMLauncherEventType event = appEvent.getType();



  • RMAppAttempt application = appEvent.getAppAttempt();



  • switch (event) {



  • case LAUNCH:



  • launch(application);



  • break;



  • case CLEANUP:



  • cleanup(application);



  • break;



  • default:



  • break;



  • }


23. 在这里我们首先要分析一下ApplicationMasterLauncher的初始化和启动,这个属于RM的子服务,那在Yarn源码剖析(一) --- RM与NM服务启动以及心跳通信我们也提到过,RM会逐一初始化和启动它的子服务,很明显这里最重要的是启动了一个线程用来处理相关的事件,那我们来看一下线程的run方法



  • @Override



  • protected void serviceInit(Configuration conf) throws Exception {



  • int threadCount = conf.getInt(



  • YarnConfiguration.RM_AMLAUNCHER_THREAD_COUNT,



  • YarnConfiguration.DEFAULT_RM_AMLAUNCHER_THREAD_COUNT);



  • ThreadFactory tf = new ThreadFactoryBuilder()



  • .setNameFormat("ApplicationMasterLauncher #%d")



  • .build();



  • launcherPool = new ThreadPoolExecutor(threadCount, threadCount, 1,



  • TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>());



  • launcherPool.setThreadFactory(tf);







  • Configuration newConf = new YarnConfiguration(conf);



  • newConf.setInt(CommonConfigurationKeysPublic.



  • IPC_CLIENT_CONNECT_MAX_RETRIES_ON_SOCKET_TIMEOUTS_KEY,



  • conf.getInt(YarnConfiguration.RM_NODEMANAGER_CONNECT_RETIRES,



  • YarnConfiguration.DEFAULT_RM_NODEMANAGER_CONNECT_RETIRES));



  • setConfig(newConf);



  • super.serviceInit(newConf);



  • }







  • @Override



  • protected void serviceStart() throws Exception {



  • launcherHandlingThread.start();



  • super.serviceStart();



  • }



  • 可以看到run方法是逐一从masterEvents队列中取出事件进行处理



  • while (!this.isInterrupted()) {



  • Runnable toLaunch;



  • try {



  • toLaunch = masterEvents.take();



  • launcherPool.execute(toLaunch);



  • } catch (InterruptedException e) {



  • LOG.warn(this.getClass().getName() + " interrupted. Returning.");



  • return;



  • }



  • }


24. 这个时候我们回到之前的lunch()方法,很明显,内部调用了createRunnableLauncher,new了一个AMLauncher,并传入 AMLauncherEventType.LAUNCH事件,最后由ApplicationMasterLauncher线程来处理



  • private void launch(RMAppAttempt application) {



  • Runnable launcher = createRunnableLauncher(application,



  • AMLauncherEventType.LAUNCH);



  • masterEvents.add(launcher);



  • }







  • protected Runnable createRunnableLauncher(RMAppAttempt application,



  • AMLauncherEventType event) {



  • Runnable launcher =



  • new AMLauncher(context, application, event, getConfig());



  • return launcher;



  • }


25. 那就会触发AMLauncher的run方法,里面有一个lunch()方法,以及提交了一个事件RMAppAttemptEventType.LAUNCHED,这个事件的提交是为了启动AM监控线程的,所以就不做分析了,重点来看lunch()方法



  • case LAUNCH:



  • try {



  • LOG.info("Launching master" + application.getAppAttemptId());



  • launch();



  • handler.handle(new RMAppAttemptEvent(application.getAppAttemptId(),



  • RMAppAttemptEventType.LAUNCHED));



  • }


26. 这里终于取出了随spark-submit传入的启动AM的上下文,并放在了StartContainerRequest请求中,然后利用调用了startContainers方法



  • ContainerLaunchContext launchContext =



  • createAMContainerLaunchContext(applicationContext, masterContainerID);







  • StartContainerRequest scRequest =



  • StartContainerRequest.newInstance(launchContext,



  • masterContainer.getContainerToken());



  • List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();



  • list.add(scRequest);



  • StartContainersRequest allRequests =



  • StartContainersRequest.newInstance(list);







  • StartContainersResponse response =



  • containerMgrProxy.startContainers(allRequests);


27. 终于开始启动AM所在的Container了,这里由ContainerManagerImpl实现,首先内部做了一些校验,执行了关键方法startContainerInternal(nmTokenIdentifier, containerTokenIdentifier, request);这段代码非常的多,所以也只选取关键的部分,我们看到它提交了事件INIT_APPLICATION。跟着代码看进去,发现最终调用了RequestResourcesTransition()方法,我们这里不分

析资源本地化的特性,有兴趣了解的可以自己查阅相关的资料,这个方法的篇幅很长,所以我选了关键的代码来分析,container.sendLaunchEvent()内部提交了ContainersLauncherEventType.LAUNCH_CONTAINER事件,这个事件交由ContainerLuncher类来处理



  • container.sendLaunchEvent();



  • container.metrics.endInitingContainer();



  • return ContainerState.LOCALIZED;


28.  containerLuncher是一个线程池对线,所以这里非常清楚的看到,new了一个ContainerLuncher对线交由线程池来处理,这里再提一下,前文也涉及到过,spark自己封装的AM启动上下文就是在这里传进去来启动AM的



  •       case LAUNCH_CONTAINER:



  •         Application app =



  •           context.getApplications().get(



  •               containerId.getApplicationAttemptId().getApplicationId());







  •         ContainerLaunch launch =



  •             new ContainerLaunch(context, getConfig(), dispatcher, exec, app,



  •               event.getContainer(), dirsHandler, containerManager);



  •         containerLauncher.submit(launch);



  •         running.put(containerId, launch);



  •         break;


29. 那到这,AM的启动基本就结束了,关于我们ContainerLuncher线程到底做了什么,大家可以自己去看内部的call()方法,这里我也不做赘述了。

总结

本文讲述了AM启动的全过程,内部的代码真的很复杂,也涉及到许多别的模块的的东西,蛋挞在这并没有全部分析,如果要统筹分析会使得思路变得混乱,对于一些蛋挞感兴趣的模块如状态机、rpc通信这些,在后续Yarn的研究中也会慢慢的学习的。后文将要介绍AM是如何注册到RM上,以及AM申请Container和Container的启动。



1 个回复

倒序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马