一、Spark提交应用任务的四个阶段: 总共提交的任务分为四个阶段,提交+执行:
1、在分配完毕executor以后,解析代码生成DAG有向无环图;
2、将生成的DAG图提交给DAGScheduler,这个组件在driver内,DAGScheduler负责切分阶段,按照DAG图中的shuffle算子进行stage阶段的切分,切分完毕阶段以后,按照每个阶段分别生成对应task任务的集合,将每个阶段所有的task任务放入到对应的set集合中,提交这个set集合,即一次性提交每个stage阶段的所有任务(每个阶段准备好就提交哪个阶段)
3、将任务的集合提交给taskScheduler【driver端组件】,这个组件会将数据通过集群管理器提交给集群(executor),对任务进行监控,分配资源,负责提交,负责执行,负责故障重试,负责落后任务的重启
4、真正提交到executor端,在executor中进行执行,保存执行过后的数据,或者存储数据
二、Spark的运行流程详解【重点】:♎ spark-submit 提交命令的解析:
通过查看底层,我们可以了解到,使用spark-submit方法提交任务的时候的时候,实际上是运行的 Spark内的 SparkSubmit 类 提交任务的命令: [backcolor=rgb(255, 255, 255) !important][size=1em] | spark-submit --master xxx --class xxx --name xxx xxx.jar input output
|
我们使用命令提交任务的时候,设置的参数实际上就是给SparkSubmit 的参数; A、查看SparkSubmit的源码:spark-submit 提交任务时,实际上是运行sparksubmit中的main方法
spark-submit 提交任务时,实际上是运行sparksubmit中的main方法: 所以--master xxx --class com.bw.spark.wordcount --name xxx xxx.jar input output 这些东西都是main方法的参数
♈ main方法中会接收传入的参数,将传入的参数解析后,使用匹配模式,判断输入的参数并执行不同的操作;
1、submit 提交一个应用任务:spark-submit --master spark://master:7077 --class com.bw.spark.wordcount --name xxx xxx.jar input output
A、可以看到在 submit 方法内解析出了四个参数:①a) childArgs == 子类的参数列表,是提交的参数列表内的 输入参数【input】 和 输出路径 【output】 ,数据用于要运行的jar包内的main方法 b) childClasspath == 子类的类的路径,是 --class 的参数【com.bw.spark】 ,数据用于SparkSubmit 类中的main方法 c) sysProps == 系统属性,即 --master 的参数,指定集群模式【spark://master:7077】,数据用于SparkSubmit 类中的main方法 d) childMainClass == --class 参数中类内的入口(main方法)【wordcount 】,数据用于SparkSubmit 类中的main方法 由SparkSubmit类中的main方法管理并启动自定义的类中的main方法
B、通过 runMain 方法使用解析出来的参数继续向下执行②
C、通过反射机制,将mainClass的字符串转换成一个主类
D、根据主类找到这个类中的main方法
E、通过反射机制执行main方法,将传递近来的参数放置到main方法中执行
总结:其实任务的提交就是运行main方法,解析代码解析main方法,解析到此进入等待状态
2、开始初始化driver端的东西,初始化上下文 ①sparkContext ②DAGScheduler ③TaskScheduler【属于预提交的阶段】A、 SparkContext中需要初始化的组件:
B、 在提交任务的时候就要初始化的重要组件:- DAGScheduler 【划分DAG并按阶段提交】
- TaskScheduler 【任务的向集群提交,监控执行】
- SchedulerBackEnd 【提交任务的通信组件】
1--2、根据部署的集群模式不一样。创建不同的DAGScheduler和TaskScheduler
3、根据部署模式不一样创建的SchedulerBackEnd也不一样,根据资源分配不同的核数: 本地模式时,根据设置的线程数创建不同的SchedulerBackEnd; 集群模式时,不需要设置,线程数由集群管理;
3、组件的创建实例完毕以后,开始解析代码:driver初始化完成以后开始解析代码,(executors已经启动),记录textFile从什么位置开始读取数据,记录每个算子生成rdd的数量,分区个数,逻辑,各个rdd之间的血缘关系,只有遇见真正的action算子才开始执行,会生成DAG有向无环图,rdd就是点,算子就是线;
A、开始将DAG有向无环图提交给DAGScheduler进行阶段的切分:从saveASTextFile开始进入,找到最后一步,可以发现是将任务提交给DAGScheduler,进行任务阶段的切分:
B、到DAGScheduler中进行任务的切分阶段,将每一个准备好的阶段提交给TaskScheduler
Ⅰ、在DAGScheduler中的doOnReceive方法接收传进DAGScheduler的任务,进行任务的处理
Ⅱ、DAGScheduler中负责将任务进行拆分,按照shuffle算子进行拆分不同的stage
❶ 找到最后一个RDD,创建一个resultStage:
❷ 根据最后一个resultStage,找到这个result的父stage,如果父stage为空,那么表明已经到了最开始的stage,直接进行提交 如果父stage不为空,那么继续调用自身以递归的形式进行倒序的向前排查,直到找到初始stage为止;
❸ Stage的划分:通过最后一个rdd向前推,如果这个RDD是宽依赖就将stage+1,如果是窄依赖就将当前stage阶段中的rdd+1;当每个阶段都推衍完毕以后,将每个阶段中的所有的task组成一个taskSet。
❹将每个阶段中的所有的task组成一个taskSet集合: 每个阶段匹配一下,如果是shuffleMapStage就组装一个集合,这个集合中装入的都是shuffleMapTask;如果是resultStage那么这个stage中装入的就是resultTask;
C、将任务集合提交给TaskScheduler,TaskScheduler进行任务的提交到集群中,然后执行操作,负责监控,申请资源,故障重试
TaskScheduler是一个接口:
TaskScheduler的其中一个实现类【TaskSchedulerImpl】: 实现类中存在各项组件方法,实现对任务进行各项初始化与管理,在配置好任务之后提交给Executor 执行;
ctrl+alt+b找到TaskScheduler接口的实现类【TaskSchedulerImpl】: 在taskSchedulerImpl中通过submitTasks方法将任务提交SchedulerBackEnd组件进行提交任务: [backcolor=rgb(255, 255, 255) !important][size=1em]1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
| //提交任务的一个组件:
override def submitTasks(taskSet: TaskSet) {
//取出集合内所有的Task任务
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//创建一个Stage管理器,对不同的stage阶段的task任务集合进行管理
val manager = createTaskSetManager(taskSet, maxTaskFailures)
//得到当前处理的stage是第几个阶段的
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
//将所有的task取出之后,使用 SchedulerBackEnd 通信方式将任务提交给executor执行
backend.reviveOffers()
}
|
D、SchedulerBackEnd 通信组件:接收TaskScheduler 发送的任务,将任务转发给 executorSchedulerBackEnd 是一个接口,这个接口存在两个实现类,一个是本地通信的实现类,一个是负责集群通信的实现类
♈ 集群模式的通信方式:1、接收TaskScheduler 发送的任务 2、遍历将接收到的TaskScheduler 任务进行序列化以用于传输 3、通过rpc协议进行任务传输,到executor端
4、运行任务:【executor开始执行任务】A、 executor的本质就是线程池 B、 Task类包装为多线程类:executor 执行器就是一个线程池,但是这个线程池中能够执行的只有多线程的类,而task任务不是多线程的,所以用一个taskRunner多线程的类对task进行包装后,Task就成为了多线程的,可以放入到线程池中运行!!!!
C、Task在executor中的运行流程:在运行任务的时候调用taskRunner中的run方法,先进行任务的反序列化,计算时间,以及分配资源,然后交给执行器进行执行,将执行完毕的任务从Taskset中去除
D、执行器【runTask方法】:任务真正运行的地方
task 分为 shuffleMapTask 和 resultTask 两种,都在task中调用runTask方法执行:
5、任务管理:driver端是所有应用的老大,他会管理每一个executor中的任务执行;监听,数据管理,任务重启。。。。 TaskScheduler 所有的组件,以及代码,变量等数据都在driver端,只有运行的时候会被传送至executor端,因此在driver中运行的程序+变量,都需要被实例化
三、spark任务的四大调度1、application spark-submit spark-shell提交的任务就是一个应用,会生成一个application 2、job 遇见一个action算子就会生成一个job 3、stage 遇见shuffle就会切分stage, stage 数量 = shuffle算子数+1 4、task 运行任务的最小单位,一个stage中最后一个rdd的分区数量就是这个stage中task任务的个数
四、几个重要的数值:- 读取外部文件的时候,rdd的分区数量是,这个被读取的文件存在多少个block快就有多少个分区;但是当文件只有一个分区时,产生两个分区;
- 每个stage中task的个数取决于最后一个rdd的分区数量
- 写入到hdfs中的文件个数(saveAsTextFile),是存储的rdd的分区数量
- 一个job每个能够运行多少个task任务?这个job区段内每个stage中的最后一个rdd的分区的总和
- 同时并行能够运行多少task任务?集群中总核数,一个线程对应一个Task任务,如果任务数量比总的核数多,则等待执行
- 带有shuffle的算子切分stage,产生的依赖是宽依赖;判断是否是宽依赖的最简单依据就是看算子是否会产生shuffle;
- 不带shuffle的算子,都在一个stage内,产生依赖是窄依赖
- 在整个应用任务流程中,action行动类算子会产生job任务使懒加载开始执行;shuffle算子切割stage,产生不同的pipeline管道形式的stage提交阶段;两者不是对等关系,job 的阶段 ≠ stage的阶段;
|