A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© wuqiong 金牌黑马   /  2018-7-24 10:00  /  1387 人查看  /  7 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

1.MapReduce的核心之shuffle详解

上一篇中我们介绍了MapReduce是什么,以及MapReduce的运行过程,其中在运行过程中主要分为Map端与Reducer端,MapReduce计算模型主要完成了映射与化简,在这其中,有一个最重要的过程那就是其核心——shuffle,shuffle翻译过来也就是混洗。可能大家比较熟知的是JAVA API中的Collections.shuffle(List)方法,这个方法会随机的打乱参数list里的元素顺序。我们先来看看官方给出的MapReduce里面的shuffle,图示如下:

            

在Hadoop这样的集群中,大部分的Map任务与Reducer任务是在不同的节点上运行的,这就会导致Reducer从不同的Map节点去拷贝输出的数据,这样就可能造成巨大的网络消耗,这种网络消耗是正常的,不可避免,我们能做的只有减少一些不必要的网络消耗。从基本的要求来说,我们希望shuffle为我们解决:

(1)能够将Map端输出的数据完整的映射到Reduce端,以保证Reducer能够完整的获取Map端的输出数据

(2)Reduce在跨节点好从Map端获取数据时,尽可能减少不必要的网络消耗

(3)尽量减少磁盘IO

面对上述,如果是我们自己设计shuffle,那么目标是什么呢?我觉得能优化的地方就是减少在网络中传输大量数据的消耗尽量使用内存,而不是磁盘。

在上一篇的MapReduce运行过程的图示中,我们已经进行了分析,下面我们结合shuffle,进一步的分析一下Map端与Reduce端的运行过程。其中shuffle横跨Map端与Reducer端

Map端如下:


上图描述的是某一个Map的运行情况,对应了官网给出图片的左半部分,Map端的shuffle对应了partition(分区),sort(排序),merge(归并)。

整体来说,分为4个部分。每一个Map都有一个内存缓冲区,存储着Map的输出结果,当缓冲区内数据的数量达到百分之80时,溢出到磁盘的一个临时文件,当整个Map任务结束之后,再对磁盘中饭的这个Map所产生的所有临时文件进行合并,产生最终的正式输出文件,以备Reduce的调用。

每部分的说明如下:

第一,在Map任务执行时,他的输入数据来源于分布式文件系统HDFS的block,在输入Map之前,会先对块进行切片,处理原理是最小切片minSize大小是1,最大切片minSplit大小是Long.Max,数据块大小blockSize取3者中的中间值,我们假设这输入的数据是“hello world hello”。

第二,通过Map的map函数处理之后,输出<K,V>对,也就是<hello,1>,<world,1>,<hello,1>,Key是单词,Value是数量1。然后输出的Key通过shuffle的Partitioner进入到了不同的分区(默认的分区算法是对Key进行Hash),分区是为了告诉数据进入到哪哪一个Reducer区中。 MapReduce提供Partitioner接口,我们可以自定义属于自己任务的分区函数。在我们这个例子中,hello经过partition之后返回0,也就是把这个值交给第一个Reducer来进行数据处理.。

第三,已经分区之后的数据进入到了内存缓冲区,内存缓冲区的大小是有限制的,默认是100M,如果Map端输出的数据在缓冲区中已经达到了百分之80(100 * 0.8=80),之后,就会开启溢写线程(Spill),将数据溢写到磁盘的临时目录下,当溢写线程启动之后,会对着要溢出的80M数据进行排序,默认是按照字典顺序进行排序。在这里,我们先想一下,由于Map任务的输出是要发送到不同的Reducer端的,而内存缓冲区没有将相同的数据例如<hello,1>,<hello,1>进行合并,如果发送到网络中,这就必然造成了巨大的网络消耗,为了减少网络消耗,我们把Map任务输出的相同的Key提前进行合并,例如将<hello,1>,<hello,1>合并成<hello,2>这个过程就叫做Combiner,也就做Map端的reduce。说白了,就是一个Reducer提前执行,只不过是发生在Map端。那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。所以从我的想法来看,Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等(注意:求平均等不能用Combiner)。Combiner的使用一定得慎重,如果用好,它对任务执行效率有帮助,反之会影响Reducer的最终结果。

第四,Merge(归并),将具有相同Key值的数据进行合并,例如<hello,1>,<hello,1>合并成<hello,[1,1]>,写到磁盘上。特别注意:

每次溢写都会在磁盘的临时目录下生成一个临时文件,如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个溢写文件存在。当Map任务完成的时候,也会对这些溢写文件中的数据进行归并(Merge),最终生成一个文件,例如文件1中<hello,1>,文件2中<hello,2>,文件3中<hello,3>合并之后就是<hello,[1,2,3]>。注意,如果这时候设置了Combiner则会执行来合并相同的Key。

至此,Map端的所有工作全部完成。简单的说,就是在不停的在读数据,输出数据,分区,排序,合并


Reducer端如下:

Shufle在Reducer端总结起来一共三步,Copy,Merge,Reduce的输入文件

第一,Copy阶段。Reducer会开启一些Copy线程,通过HTTP的方式获取Map端的输出文件。

第二,Merge阶段。这里的Merge如map端的Merge动作,只是数组中存放的是不同Map端Copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比Map端的更为灵活,它基于JVM的heap size设置,因为Shuffle阶段Reducer不运行,所以应该把绝大部分的内存都给Shuffle用。这里需要强调的是,Merge有三种形式:1)内存到内存  2)内存到磁盘  3)磁盘到磁盘。默认情况下第一种形式不启用,让人比较困惑,是吧。当内存中的数据量到达一定阈值,就启动内存到磁盘的Merge。与Map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种Merge方式一直在运行,直到没有Map端的数据时才结束,然后启动第三种磁盘到磁盘的Merge方式生成最终的那个文件。

第二,Reduce的输入文件。不断的Merge之后,形成一个最终的文件,这个文件有可能在内存,也有可能存放在磁盘上,如果存在内存中,那么就直接作为Reducer的输入,但是默认情况下是存储在磁盘上的。当Reducer的输入文件已经确定,整个Shuffle才会最终的结束。然后就是Reducer执行,把最后的结果存储在HDFS上.




7 个回复

倒序浏览
奈斯,优秀
回复 使用道具 举报
回复 使用道具 举报
奈斯,优秀
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马