A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

本帖最后由 彼岸话雨 于 2019-1-26 15:28 编辑

foreach和foreachPartition的区别
一,基本使用
1,RDD分布式数据集的五大特性
       1),A list of partitions(一系列的分区)
       2),A function for computing eachsplit(计算每个分片的方法)
       3),A list of dependencies on otherRDDs(一系列的依赖RDD)
       4),Optionally, a Partitioner forkey-value RDDs (e.g. to say that the RDD is hash-partitioned)
(可选,对于key-value类型的RDD都会有一个分区器)
       5),Optionally, a list of preferredlocations to compute each split on (e.g. block locations for an HDFS file)(可选,最佳位置)
2,RDD的操作类型:
       Transformations:转换操作,lazy型,不会触发计算
       Action:触发job
       Persist:缓存,也不会触发job,在第一次触发job之后才会真正进行缓存。
3,RDD的计算
  RDD的计算实际上我们可以分为两个大部分:
      1),Driver端的计算
     主要是stage划分,task的封装,task调度执行
  2),Executor端的计算
       真正的计算开始,默认情况下每个cpu运行一个task。一个task实际上就是一个分区,我们的方法无论是转换算子里封装的,还是action算子里封装的都是此时在一个task里面计算一个分区的数据。
   下面就那这两个例子,开始讲解吧,针对转换类型的操作可以类比查看。
[Scala] 纯文本查看 复制代码
 jsonRDD.foreach(each=>{
    //连接数据库
    //插入数据库
    //关闭数据库连接
  })
  jsonRDD.foreachPartition(partition=>{
    //此处连接上数据库
    partition.foreach(each=>
    //插入数据
    })
    //关闭数据库连接
  })

这两个算子里面,上述我说的”我们的方法是”,每个算子圆括号内部的所有内容。

二,源码相关
1,第一次封装
[Scala] 纯文本查看 复制代码
/**
* Applies a function f to all elements of  this RDD.
*/
def foreach(f: T => Unit): Unit =  withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) =>  iter.foreach(cleanF))
} 
/**
* Applies a function f to each partition of  this RDD.
*/
def foreachPartition(f: Iterator[T]  => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) =>  cleanF(iter))
}
  

       可以看到方法通过clean操作(清理闭包,为序列化和网络传输做准备),进行了一层匿名函数的封装,针对foreach方法,是我们的方法被传入了迭代器的foreach(每个元素遍历执行一次函数),而对于foreachpartiton方法是迭代器被传入了我们的方法(每个分区执行一次函数,我们获取迭代器后需要自行进行迭代处理,也即上述第二个demo的partition.foreach)。
2,第二次封装
这次很统一
[Scala] 纯文本查看 复制代码
  
/**
* Run a job on a given set of partitions of  an RDD, but take a function of type
* `Iterator[T]  => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
def runJob[T, U: ClassTag](
    rdd:  RDD[T],
    func: Iterator[T]  => U,
    partitions: Seq[Int]):  Array[U] = {
  val cleanedFunc = clean(func)
  runJob(rdd, (ctx:  TaskContext, it: Iterator[T]) =>  cleanedFunc(it), partitions)
}

   就是将上述进一步封装的方法进一步按照匿名函数封装
(ctx:TaskContext, it: Iterator[T]) => cleanedFunc(it)
3,执行的时候
Spark的Task类型我们用到的也就两个
1),ShuffleMapTask
2),ResultTask
Action算子的方法执行是在ResultTask中执行的,也即ResultTask的runTask方法。首先反序列化得到我们的方法(2步骤封装的)和RDD,然后执行。传入的是迭代器
[Scala] 纯文本查看 复制代码
  
val (rdd, func) =  ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)]( ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
metrics = Some(context.taskMetrics)
func(context, rdd.iterator(partition, context))
  

三,总结
RDD.foreach(foreachFunction)
RDD.foreachPatition(foreachPartitionFunction)
经过第二步的分析我们可以理解,展开之后实际上就是
RDD的每个分区的iterator(集合):
iterator.foreach(foreachFunction)
foreachPartitionFunction(iterator)
这就很明显了,假如我们的Function中有数据库,网络TCP等IO链接,文件流等等的创建关闭操作,采用foreachPatition方法,针对每个分区集合进行计算,更能提高我们的性能。


0 个回复

您需要登录后才可以回帖 登录 | 加入黑马