一,基本使用 1,RDD分布式数据集的五大特性 1),A list of partitions(一系列的分区) 2),A function for computing each split(计算每个分片的方法) 3),A list of dependencies on other RDDs(一系列的依赖RDD) 4),Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) (可选,对于key-value类型的RDD都会有一个分区器) 5),Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)(可选,最佳位置) 2,RDD的操作类型: Transformations:转换操作,lazy型,不会触发计算 Action:触发job Persist:缓存,也不会触发job,在第一次触发job之后才会真正进行缓存。 3,RDD的计算 RDD的计算实际上我们可以分为两个大部分: 1),Driver端的计算 主要是stage划分,task的封装,task调度执行 2),Executor端的计算 真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。 下面就那这两个例子,开始讲解吧,针对转换类型的操作可以类比查看。 这两个算子里面,上述我说的”我们的方法是”,每个算子圆括号内部的所有内容。 二,源码相关 1,第一次封装 可以看到方法通过clean操作(清理闭包,为序列化和网络传输做准备),进行了一层匿名函数的封装, 针对foreach方法,是我们的方法被传入了迭代器的foreach(每个元素遍历执行一次函数), 而对于foreachpartiton方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理,也即上述第二个demo的partition.foreach)。 2,第二次封装 这次很统一就在 就是讲上述进一步封装的方法进一步按照匿名函数封装 (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it) 3,执行的时候 Spark的Task类型我们用到的也就两个 1),ShuffleMapTask 2),ResultTask Action算子的方法执行是在ResultTask中执行的,也即ResultTask的runTask方法。 首先反序列化得到我们的方法(2步骤封装的)和RDD,然后执行。传入的是迭代器 三,总结 RDD.foreach(foreachFunction) RDD.foreachPatition(foreachPartitionFunction) 经过第二步的分析我们可以理解,展开之后实际上就是 RDD的每个分区的iterator(集合): iterator.foreach(foreachFunction) foreachPartitionFunction(iterator) 这就很明显了,假如我们的Function中有数据库,网络TCP等IO链接,文件流等等的创建关闭操作,采用foreachPatition方法,针对每个分区集合进行计算,更能提高我们的性能。
|