spark-submit脚本 1. 先来观察一下任务提交时的spark-submit脚本中各个参数的含义(并没列举所有,只列举了关键的几个参数)
/spark/bin/spark-submit \
--master yarn \ //提交模式,显而易见我们是提交到yarn上
--deploy-mode cluster \ //运行的模式,还有一种client模式,但大多用于调试,此处使用cluster模式
--class org.apache.spark.test \ //提交的任务
--name "test" \ //任务名字
--queue root.default \ //提交的队列
--driver-memory 3g \ //为driver申请的内存
--num-executors 1 \ //executors的数量,可以理解为线程数,对应yarn中的Container个数
--executor-memory 6g \ //为每一个executor申请的内存
--executor-cores 4 \ //为每一个executor申请的core
--conf spark.yarn.driver.memoryOverhead=1g \ //driver可使用的非堆内存,这些内存用于如VM,字符 串常量池以及其他额外本地开销等
--conf spark.yarn.executor.memoryOverhead=2g \ //每个executor可使用的非堆内存,这些内存用于如 VM,字符串常量池以及其他额外本地开销等
2. 用户按照自己的需求提交了该脚本,然后进入到任务提交阶段,实现类是org.apache.spark.deploy.SparkSubmit:
从main方法中可以看到这段代码,如果是SUBMIT,则调用submit
调用顺序:main--> submit(appArgs)->runMain(childArgs, childClasspath,sysProps,hildMainClass, args.verbose)
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
3. 在runMain方法中会先进入prepareSubmitEnvironment方法,在该方法中得知,如果运行的模式是yarn则会使用org.apache.spark.deploy.yarn.YarnClusterApplication类作为提交任务到yarn程序的入口。其实prepareSubmitEnvironment方法中就是设置参数、准备submit环境之类的事情,代码量较大,得知在这个方法中得知了相关yarn的启动类后就不再多做分析了。
4. YarnClusterApplication这是org.apache.spark.deploy.yarn.Client中的一个内部类,在YarnClusterApplication中new了一个Client对象,并调用了run方法
private[spark] class YarnClusterApplication extends SparkApplication {
override def start(args: Array[String], conf: SparkConf): Unit = {
//SparkSubmit在yarn模式下会使用yarn缓存去处理文件和jar包,因此在这里将它们从sparkConf中移除
conf.remove("spark.jars")
conf.remove("spark.files")
new Client(new ClientArguments(args), conf).run()
}
}
5. 在run方法中会调用submitApplication方法,此方法则是实现向yarn中的ResourceManager(后文全部简称RM),提交运行任务,并且运行我们的ApplicationMaster(后文简称AM),在该方法中初始化并启动了yarnClient用以使用yarn提供的各种API,而在submitApplication内部有两个关键的方法,最终调用yarnClient.submitApplication(appContext)向yarn提交任务启动的请求。
// 关键是这两个方法:
// 1. 创建ApplicationMaster ContainerLaunch上下文,
// 将ContainerLaunch命令、jar包、java变量等环境准备完毕;
val containerContext = createContainerLaunchContext(newAppResponse)
// 2. 创建Application提交至YARN的上下文,主
// 要读取配置文件设置调用YARN接口前的上下文变量。
val appContext = createApplicationSubmissionContext(newApp, containerContext)
// Finally, submit and monitor the application
logInfo(s"Submitting application $appId to ResourceManager")
yarnClient.submitApplication(appContext) //提交应用程序
launcherBackend.setAppId(appId.toString)
reportLauncherState(SparkAppHandle.State.SUBMITTED)
6. 在createContainerLaunchContext内部初始化了ApplicationMaster所需的资源,环境变量等,所以不做赘述,我们直接来看ApplicationMaster的main()方法,内部new了一个AM对象以及执行run方法,并且在run方法中根据任务选择的模式再选择对应的方法,我们的任务是Cluster模式所以我们进入runDriver方法。
def main(args: Array[String]): Unit = {
SignalUtils.registerLogger(log)
val amArgs = new ApplicationMasterArguments(args)
master = new ApplicationMaster(amArgs)
System.exit(master.run())
}
if (isClusterMode) {
runDriver() //yarn-cluster
} else {
runExecutorLauncher() //yarn-client
}
7. 在runDriver方法中执行了一个registerAM方法(),该方法中有两个重要的方法
allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress,
historyAddress, securityMgr, localResources) //向RM注册AM
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
allocator.allocateResources() //为executor分配资源
8. 先看第一个方法client.register,该方法向RM注册AM(是调用yarn中AMRMClient的API实现的,具体的实现我会在后续的章节中介绍),并申请供AM使用的Container
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
this.uiHistoryAddress = uiHistoryAddress
val trackingUrl = uiAddress.getOrElse {
if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else ""
}
logInfo("Registering the ApplicationMaster")
synchronized {
amClient.registerApplicationMaster(Utils.localHostName(), 0, trackingUrl)
registered = true
}
9. 再看第二个方法allocator.allocateResources(),该方法拿到了yarn返回给AM的Container集合,然后去处理这些集合使得executor可以在这些Container中启动
updateResourceRequests() //更新同步yarn资源信息
val progressIndicator = 0.1f
//调查ResourceManager。如果没有挂起的容器请求,这将充当心跳.
//此方法就是去yarn查看所有节点可用的资源信息
val allocateResponse = amClient.allocate(progressIndicator)
//获得yarn分配回来的Container
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
numExecutorsRunning.get,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
//处理获取的Container
handleAllocatedContainers(allocatedContainers.asScala)
}
10. 在这个handleAllocatedContainers方法中,根据一些传入的配置对Container进行了一些设置,然后调用该方法中的runAllocatedContainers(containersToUse)去启动executor,最终真正执行启动Container的是在ExecutorRunnable.run()中。创建了NMClient客户端调用提供的API最终实现在NM上启动Container,具体如何启动Container将在后文中进行介绍。
def run(): Unit = {
logDebug("Starting Executor Container")
nmClient = NMClient.createNMClient()
nmClient.init(conf)
nmClient.start()
startContainer()
}
总结本文通过对spark-submit源码的剖析试图介绍用户运行任务的整个过程。
|