黑马程序员技术交流社区

标题: 【上海校区】Spark Sql Execution [打印本页]

作者: 玩转曼哈顿    时间: 2019-6-24 10:50
标题: 【上海校区】Spark Sql Execution
关于spark-sql模块
spark-sql模块下的代码作用有:
execution包
该包下源码分为:
下文重点分析 aggregate, exchange, joins包。
exchange包Exchange & ExchangeCoordinator
首先讲一下Exchange,顾名思义就是交换,是为了在多个线程之间进行数据交换,完成并行。
Exchange分为两种,一种是BroadcastExchange另外一种是ShuffleExchange。Broadcast就是将数据发送至driver,然后由driver广播,这适合于数据量较小时候的shuffle。另一种ShuffleExchange就比较常见了,就是多对多的分发。
如果开启了spark.sql.adaptive.enabled,也就是自适应执行,
那么在使用ShuffleExchange的时候有对应的ExchangeCoordinator;如果没开启ae,那就不需要协调器。
ExchangeCoordinator顾名思义,是Exchange协调器,是一个用于决定怎么在stage之间进行shuffle数据的coordinator。这个协调器用于决定之后的shuffle有多少个partition用来需要fetch shuffle 数据。
一个coordinator有三个参数.
第一个参数是用于表示有多少个ShuffleExchangeExec需要注册到这个coordinator里面。因此,当我们要开始真正执行时,我们需要知道到底有多少个ShuffleExchangeExec。
第二个参数是表示后面shuffle阶段每个partition的输入数据大小,这是用于adaptive-execution的,用于推测后面的shuffle阶段需要多少个partition。可以通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize来配置。
第三个参数是一个可选参数,表示之后shuffle阶段最小的partition数量。如果这个参数被定义,那么之后的shuffle阶段的partition数量不能小于这个值。
Coordinator的流程如下:
目前的ae是比较老版本的ae,intel有一个ae项目,相信会在spark-3.0之后会合入,可以了解一下新的ae。
EnsureRequirements
这就是前面说的在SparkPlan 之前的do-prepare,需要为sparkplan的可执行做一些准备工作。
主要分为以下几部分:
Join
这里的join是执行阶段的join,不是解析阶段的join。
join包括BroadcastHashJoinExec, ShuffledHashJoinExec以及SortMergeSortMergeJoinExec。
join策略的选择SparkStrategies类里面,面临join,首先会尝试BroadcastJoin,然后是ShuffledHashJoin最后才是SortMergeHashJoin,只要能满足前面的条件就会优先使用前面的join。
而这些JoinExec是在前面构建物理之前进行构建,在之后真正执行物理计划的时候执行。
Aggregate
Agg和join有些类似,都是需要进行shuffle操作,但不同的是Aggregate可以是一个一元操作,而join是多元操作。
Aggregate操作例如 max, count, min, sum,groupBy, reduce, reduceBy 以及一些UDAF(User Defined Aggregate Function)。而groupBy这些操作可能面临order by.
Conclusion
本文大概讲解了下execution包中各个部分的用途,重点是如何进行Exchange,以及什么是ExchangeCoordinator。对于join和Aggregate未涉及太多。
本文转载来自:https://www.turbofei.wang/spark/2019/05/20/spark-sql-execution








欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2