A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

spark-submit脚本

1. 先来观察一下任务提交时的spark-submit脚本中各个参数的含义(并没列举所有,只列举了关键的几个参数)



  • /spark/bin/spark-submit \



  • --master yarn \ //提交模式,显而易见我们是提交到yarn上



  • --deploy-mode cluster \ //运行的模式,还有一种client模式,但大多用于调试,此处使用cluster模式



  • --class org.apache.spark.test \ //提交的任务



  • --name "test" \ //任务名字



  • --queue root.default \ //提交的队列



  • --driver-memory 3g \ //为driver申请的内存



  • --num-executors 1 \ //executors的数量,可以理解为线程数,对应yarn中的Container个数



  • --executor-memory 6g \ //为每一个executor申请的内存



  • --executor-cores 4 \ //为每一个executor申请的core



  • --conf spark.yarn.driver.memoryOverhead=1g \ //driver可使用的非堆内存,这些内存用于如VM,字符 串常量池以及其他额外本地开销等



  • --conf spark.yarn.executor.memoryOverhead=2g \ //每个executor可使用的非堆内存,这些内存用于如 VM,字符串常量池以及其他额外本地开销等


2. 用户按照自己的需求提交了该脚本,然后进入到任务提交阶段,实现类是org.apache.spark.deploy.SparkSubmit:



  • 从main方法中可以看到这段代码,如果是SUBMIT,则调用submit



  • 调用顺序:main--> submit(appArgs)->runMain(childArgs, childClasspath,sysProps,hildMainClass, args.verbose)







  • appArgs.action match {



  • case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)



  • case SparkSubmitAction.KILL => kill(appArgs)



  • case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)



  • }


3. 在runMain方法中会先进入prepareSubmitEnvironment方法,在该方法中得知,如果运行的模式是yarn则会使用org.apache.spark.deploy.yarn.YarnClusterApplication类作为提交任务到yarn程序的入口。其实prepareSubmitEnvironment方法中就是设置参数、准备submit环境之类的事情,代码量较大,得知在这个方法中得知了相关yarn的启动类后就不再多做分析了。


4. YarnClusterApplication这是org.apache.spark.deploy.yarn.Client中的一个内部类,在YarnClusterApplication中new了一个Client对象,并调用了run方法



  • private[spark] class YarnClusterApplication extends SparkApplication {



  •     override def start(args: Array[String], conf: SparkConf): Unit = {



  •      //SparkSubmit在yarn模式下会使用yarn缓存去处理文件和jar包,因此在这里将它们从sparkConf中移除



  •      conf.remove("spark.jars")



  •      conf.remove("spark.files")



  •      new Client(new ClientArguments(args), conf).run()



  •     }



  • }


5. 在run方法中会调用submitApplication方法,此方法则是实现向yarn中的ResourceManager(后文全部简称RM),提交运行任务,并且运行我们的ApplicationMaster(后文简称AM),在该方法中初始化并启动了yarnClient用以使用yarn提供的各种API,而在submitApplication内部有两个关键的方法,最终调用yarnClient.submitApplication(appContext)向yarn提交任务启动的请求。



  • // 关键是这两个方法:



  • // 1. 创建ApplicationMaster ContainerLaunch上下文,



  • // 将ContainerLaunch命令、jar包、java变量等环境准备完毕;



  • val containerContext = createContainerLaunchContext(newAppResponse)







  • // 2. 创建Application提交至YARN的上下文,主



  • // 要读取配置文件设置调用YARN接口前的上下文变量。



  • val appContext = createApplicationSubmissionContext(newApp, containerContext)







  • // Finally, submit and monitor the application



  • logInfo(s"Submitting application $appId to ResourceManager")



  • yarnClient.submitApplication(appContext) //提交应用程序



  • launcherBackend.setAppId(appId.toString)



  • reportLauncherState(SparkAppHandle.State.SUBMITTED)


6. 在createContainerLaunchContext内部初始化了ApplicationMaster所需的资源,环境变量等,所以不做赘述,我们直接来看ApplicationMaster的main()方法,内部new了一个AM对象以及执行run方法,并且在run方法中根据任务选择的模式再选择对应的方法,我们的任务是Cluster模式所以我们进入runDriver方法。



  • def main(args: Array[String]): Unit = {



  •     SignalUtils.registerLogger(log)



  •     val amArgs = new ApplicationMasterArguments(args)



  •     master = new ApplicationMaster(amArgs)



  •     System.exit(master.run())



  • }







  • if (isClusterMode) {



  •     runDriver() //yarn-cluster



  • } else {



  • runExecutorLauncher() //yarn-client



  • }


7. 在runDriver方法中执行了一个registerAM方法(),该方法中有两个重要的方法



  • allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress,



  • historyAddress, securityMgr, localResources) //向RM注册AM







  • rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))







  • allocator.allocateResources() //为executor分配资源


8. 先看第一个方法client.register,该方法向RM注册AM(是调用yarn中AMRMClient的API实现的,具体的实现我会在后续的章节中介绍),并申请供AM使用的Container



  • amClient = AMRMClient.createAMRMClient()



  • amClient.init(conf)



  • amClient.start()



  • this.uiHistoryAddress = uiHistoryAddress







  • val trackingUrl = uiAddress.getOrElse {



  • if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""



  • }







  • logInfo("Registering the ApplicationMaster")



  • synchronized {



  • amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)



  • registered = true



  • }


9. 再看第二个方法allocator.allocateResources(),该方法拿到了yarn返回给AM的Container集合,然后去处理这些集合使得executor可以在这些Container中启动



  • updateResourceRequests() //更新同步yarn资源信息







  • val progressIndicator = 0.1f



  • //调查ResourceManager。如果没有挂起的容器请求,这将充当心跳.



  • //此方法就是去yarn查看所有节点可用的资源信息



  • val allocateResponse = amClient.allocate(progressIndicator)



  • //获得yarn分配回来的Container



  • val allocatedContainers = allocateResponse.getAllocatedContainers()







  • if (allocatedContainers.size > 0) {



  • logDebug(("Allocated containers: %d. Current executor count: %d. " +



  • "Launching executor count: %d. Cluster resources: %s.")



  • .format(



  • allocatedContainers.size,



  • numExecutorsRunning.get,



  • numExecutorsStarting.get,



  • allocateResponse.getAvailableResources))



  • //处理获取的Container



  • handleAllocatedContainers(allocatedContainers.asScala)



  • }


10. 在这个handleAllocatedContainers方法中,根据一些传入的配置对Container进行了一些设置,然后调用该方法中的runAllocatedContainers(containersToUse)去启动executor,最终真正执行启动Container的是在ExecutorRunnable.run()中。创建了NMClient客户端调用提供的API最终实现在NM上启动Container,具体如何启动Container将在后文中进行介绍。



  • def run(): Unit = {



  • logDebug("Starting Executor Container")



  • nmClient = NMClient.createNMClient()



  • nmClient.init(conf)



  • nmClient.start()



  • startContainer()



  • }


总结

本文通过对spark-submit源码的剖析试图介绍用户运行任务的整个过程。


1 个回复

倒序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马