关于spark-sql模块 spark-sql模块下的代码作用有: - sparkSession,SQLContext,DataSet,DataFrameWriter和reader等类
- api 包: python,r的api
- catalog包: catalog以及column,table,function,database的接口类
- expression包: 包括aggregator,UDF, UDAF以及窗口函数
- internal包:包括catalogImpl, HiveSerde,sessionState,sharedState等internal类
- jdbc包: 里面都是方言类 dialect
- sources包: 用于下推至DataSource的filter以及一些DataSource和sql relation相关接口
- streaming包: DataStreamReader, writer,StreamingQueryException等于streaming有关的类。
- util包: QueryExecutionListener类
- execution包:本文的重点
execution包该包下源码分为: - 直接在根目录下
- 基本物理操作: coalesceExec, filterExec, projectExec, unionExec, RangeExec, and etc.
- cacheManager:用于cache table.
- 一些exec: sort, DataSourceScan
- wholeStageCodegenExec
- SparkPlanner用于生产物理计划的
- limit以及其他操作
- aggregate包: 当然是与聚合相关的exec 以及UDAF
- arrow包: arrow也是一种列式存储格式,这个包有他的 writer以及工具类.
- columnar包: 与列状态,访问,类型相关的类
- command包: 一些命令,ddl, analyzeTableCommand, functions, create等命令
- DataSources包: 与DataSource有关,parquet, jdbc, orc等等
- 关于DataSource options, writer, 工具类等等
- exchange: 里面有broadcastExchangeExec, exchangeCoordinator,shuffleExchangeExec等相关类,后面会重点分析这个,其实在物理计划中转换的重点就是这部分,所以ensureRequirements就在这个包中。
- joins: join相关的, hashJoin, broadcastHashJoin, broadcastNestedLoopJoin, sortMergeJoin,shuffleHashJoinExec.
- streaming包: 应该是continuous streaming相关的底层实现,应该不是走rdd,是一个真正流式的实现.
- window包: 窗口函数相关exec。
- python包,r包,metric包。
下文重点分析 aggregate, exchange, joins包。 exchange包Exchange & ExchangeCoordinator首先讲一下Exchange,顾名思义就是交换,是为了在多个线程之间进行数据交换,完成并行。 Exchange分为两种,一种是BroadcastExchange另外一种是ShuffleExchange。Broadcast就是将数据发送至driver,然后由driver广播,这适合于数据量较小时候的shuffle。另一种ShuffleExchange就比较常见了,就是多对多的分发。 如果开启了spark.sql.adaptive.enabled,也就是自适应执行, 那么在使用ShuffleExchange的时候有对应的ExchangeCoordinator;如果没开启ae,那就不需要协调器。 ExchangeCoordinator顾名思义,是Exchange协调器,是一个用于决定怎么在stage之间进行shuffle数据的coordinator。这个协调器用于决定之后的shuffle有多少个partition用来需要fetch shuffle 数据。 一个coordinator有三个参数. - numExchanges
- targetPostShuffleInputSize
- minNumPostShufflePartitions
第一个参数是用于表示有多少个ShuffleExchangeExec需要注册到这个coordinator里面。因此,当我们要开始真正执行时,我们需要知道到底有多少个ShuffleExchangeExec。 第二个参数是表示后面shuffle阶段每个partition的输入数据大小,这是用于adaptive-execution的,用于推测后面的shuffle阶段需要多少个partition。可以通过spark.sql.adaptive.shuffle.targetPostShuffleInputSize来配置。 第三个参数是一个可选参数,表示之后shuffle阶段最小的partition数量。如果这个参数被定义,那么之后的shuffle阶段的partition数量不能小于这个值。 Coordinator的流程如下: - 在一个SparkPlan执行之前,对于一个ShuffleExchangeExec操作,如果它被指定了一个coorinator,那么它将会注册到这个协调器,这发生在doPrepare方法里.
- 当开始执行SparkPlan,注册到这个协调器里面的ShuffleExchangeExec将会调用postShuffleRDD方法来相应的 post-shuffle ShuffledRowRDD.如果这个协调器已经决定了如何去shuffle data,那么这个Exec会马上获得它对应的ShuffledRowRDD.
- 如果这个coordinator已经决定了如何shuffle data,它会让注册到自己的ShuffleExchangeExecs来提交pos-shuffle stage。然后基于pre-shuffle阶段partition的统计信息,来决定post-shuffle的partition数量,如果post-shuffle需要,它也会将一些需要的连续的partitions放在一起发送给post-shuffle.
- 最后,这个coordinator会为所有注册的ShuffleExchangeExec创建post-shuffle ShuffledRowRDD。
目前的ae是比较老版本的ae,intel有一个ae项目,相信会在spark-3.0之后会合入,可以了解一下新的ae。 EnsureRequirements这就是前面说的在SparkPlan 之前的do-prepare,需要为sparkplan的可执行做一些准备工作。 主要分为以下几部分: - 确定distribution和Ordering
- 确定join 的条件和join keys出现顺序匹配
- 创建coordinator
- ae开启
- 支持Coordinator
- 有ShuffleExchangeExec且是HashPartitionings
- 支持Distribution并且child个数大于1
- 针对ShuffleExchangeExec创建coordinator
- 针对一个post-shuffle 对应几个pre-shuffle的创建coordinator,例如join 一个对应多个pre-shuffle,而几个pre-shuffle有不同的分区划分方式
Join这里的join是执行阶段的join,不是解析阶段的join。 join包括BroadcastHashJoinExec, ShuffledHashJoinExec以及SortMergeSortMergeJoinExec。 join策略的选择SparkStrategies类里面,面临join,首先会尝试BroadcastJoin,然后是ShuffledHashJoin最后才是SortMergeHashJoin,只要能满足前面的条件就会优先使用前面的join。 而这些JoinExec是在前面构建物理之前进行构建,在之后真正执行物理计划的时候执行。 AggregateAgg和join有些类似,都是需要进行shuffle操作,但不同的是Aggregate可以是一个一元操作,而join是多元操作。 Aggregate操作例如 max, count, min, sum,groupBy, reduce, reduceBy 以及一些UDAF(User Defined Aggregate Function)。而groupBy这些操作可能面临order by. Conclusion本文大概讲解了下execution包中各个部分的用途,重点是如何进行Exchange,以及什么是ExchangeCoordinator。对于join和Aggregate未涉及太多。 本文转载来自:https://www.turbofei.wang/spark/2019/05/20/spark-sql-execution
|