黑马程序员技术交流社区

标题: 【西安校区】MapReduce模型分析 [打印本页]

作者: 就业高冷派    时间: 2019-3-27 16:47
标题: 【西安校区】MapReduce模型分析
摘要:
MapReduce是一个编程模型以及用来处理和生成大数据集的一个相关实现。用户通过描述一个map函数,处理一组key/value对,进而生成一组key/value对的中间结果,然后描述一个reduce函数,将具有相同key的中间结果进行归并。正如论文所表明的,很多现实世界中的任务都可以用这个模型来表达。

以这种函数式风格写出来的程序在一个由普通机器组成的集群上自动的进行并行化和执行。由一个运行时系统来关注输入数据的划分细节,在机器集合上的程序执行调度,处理机器失败以及管理所需要的机器间的通信。这就允许那些没有并行分布式系统编程经验的程序员很容易的使用大型分布式系统的资源。

我们的MapReduce实现运行在一个有很多普通机器组成的集群上,而且具有高扩展性:一个典型的MapReduce计算将会在一个数千台机器的集群上处理很多T的数据。对于程序员来说,这个系统很好用,目前已经有数百个MapReduce程序实现,在Google的集群上每天有上千个MapReduce job在跑。

1.导引
在过去的五年来,作者和Google的其他工程师已经实现了数百了用于特殊目的在大量原始数据(比如爬虫爬的文档,web访问日志等等)上进行的运算。为了计算各种类型的衍生数据:比如倒排索引,网页文档的图结构的各种不同表示,每个host的网页数,给定的一天中最常查询集合。大部分这样的计算在概念上都是很直接的。然而由于输入数据通常是很庞大的,因此为了能在合理的时间内结束,计算就不得不分布在成百上千台机器上执行。如何并行化计算,分布数据,处理错误都会使得原本简单的计算需要大量额外的代码去处理这些问题。

为了应对这种复杂性,我们设计了一种抽象,使得我们可以表达我们需要执行的这种简单的运算,而将并行化,容错,数据分布,负载平衡这样的细节封装在库里。我们的抽象源于Lisp以及其他函数式编程语言中的map-reduce原语。我们发现我们大部分的计算都是首先在输入的每条记录上执行一个map操作以产生一个key/value的中间结果集合,然后为了得到相应的派生数据,对那些具有相同key的值应用一个reduce操作。通过使用由用户描述的map和reduce操作组成的函数式模型,使得我们很容易的进行计算并行化,同时使用重新执行作为基本的容错机制。

文章的主要贡献就是提供了一个允许对大规模计算进行自动并行化以及数据分布的简单有力的接口。同时提供了一个可以在普通pc机组成的大集群上达到很高的性能针对该接口的实现。

第2节描述了基本的编程模型并且给出了几个简单例子。第3节描述了一个面向我们的基于集群的计算环境的该接口的实现。第4节描述了该模型中我们认为很有用的几个概念。第5节对我们的实现通过几个task进行了测试。第6节介绍了MapReduce在Google内部的使用,包括使用它重写我们的产品索引系统的一些经验。第7节讨论了相关的以及未来的工作。

2.编程模型
计算有一个key/value输入对集合,产生一系列的输出key/value对。Mapreduce库的用户通过两个函数:Map和Reduce来表达这个计算。

Map,由用户编写,有一个输入对,产生一集key/value对的中间结果。Mapreduce库将具有相同key(比如I)的那些中间值组织起来,然后将它们传给Reduce函数。

Reduce函数,也是由用户编写,接受一个中间值key(比如I),以及对应于该key的value集合作为输入。它将这些value归并起来形成一个可能更小的value集合。通常每个Reduce调用产生0个或者1个输出值。中间值的value集合是通过一个迭代器来提供给用户的Reduce函数。这允许我们能处理那些太大以至于无法一次放入内存的value列表。

2.1例子

考虑在一个大文档集合中计算单词出现频率的问题。用户可以用类似如下伪代码的方式来编写代码。

map(String key, String value):
      // key: document name
      // value: document contents
      for each word w in value:
      EmitIntermediate(w, "1");

reduce(String key, Iterator values):
      // key: a word
      // values: a list of counts
      int result = 0;
      for each v in values:
      result += ParseInt(v);
      Emit(AsString(result));

Map函数输出每个单词及其相应出现次数(在这个简单例子中,就是1)reduce函数将所有的次数加起来然后为每一个单词输出它。

另外用户还需要向一个MapReduce描述对象中填写输入输出文件名称以及一些可选的参数。然后用户调用Mapreduce函数,将该描述对象传给它。用户代码需要链接Mapreduce库(采用c++实现)。附录A包含该例子的完整代码。

2.2类型

尽管前面的例子是以字符串类型作为输入输出,概念上来说用户提供的map和reduce函数有如下的类型关联;

Map:      (k1,v1)         -> list(k2,v2)

Reduce:  (k2,list(v2))  -> list(v2)

即输入的key和value与输出的key value来自于不同的域,另外中间结果的key value与输出的key value具有相同的域。{->前后分别代表了输入输出,所以list(k2,v2)代表了中间结果,可以看到中间结果与Reduce的key value名称都是k2,v2,以表示它们具有相同的域。也就是说k1和k2,v1和v2所代表的实际含义可能是不同的,比如url访问频率计数,map的输入就是<logname,log>,而中间结果与reduce则是<url,出现次数>}

我们的用户程序以字符串形式传递给或者接受自用户定义函数,将字符串与相应类型的转换交给用户代码处理。

2.3更多的实例

下面有一些可以使用MapReduce进行计算的简单而有趣的例子。

分布式GREP:map函数输出该行,如果它与给定的模式匹配。Reduce函数只需要将给定的中间结果输出。

URL访问频率计数:map函数处理网页访问日志,输出<URL,1>。Reduce函数将相同URL的value加起来,然后输出<URL, total count>对。

网页链接逆向图:map函数输入<target, source>对表示从网页source到target URL的一条链接。Reduce函数将给定Target URL的所有source URL连接到一块,然后输出<target, list(source)>。

Host短语向量:一个term vector是指出现在一个文档或者文档集合中的最重要的单词的<word, frequency>对列表。Map函数对每一个文档输出<hostname, term vector>(host是从该文档对应的url中抽取出来)。Reduce函数接受到一个给定host的所有文档的term vector。它将这些term vector合并,并扔掉不常出现的那些terms,然后输出一个最终的<hostname, term vector>对。

倒排索引:map函数解析每个文档,输出一个<word,docid>对序列。Reduce函数接受一个给定word的所有序列,对相应的docid进行排序,输出一个<word,list(docid)>。所有的输出对集合就形成了一个简单的倒排索引。通过很简单的改动,我们就可以让这个计算同时记住单词的在文档中的出现位置。

分布式排序:map函数从每条记录中提取key,然后简单的输出<key,record>对。Reduce函数原样地输出所有的对。该计算依赖于4.1节描述的划分功能以及4.2节描述的排序属性。

3.实现
对于MapReduce接口可以有很多不同的实现。正确的选择依赖于环境。比如一个实现可能适用于小型共享内存机器,另一个可能适用于一个大的NUMA多处理机,另一个适用于更大的通过网络互联的集群。

本节描述一个面向Google内部广泛使用的计算环境(由普通pc通过以太网交换机连接而成的大集群)的实现。在我们的环境里:

1.       机器主要是运行linux的双核x86处理器,每台机器具有2-4GB内存

2.       使用通用的网络硬件:在机器级别上,通常不是100Mbps就是1Gbps,平均下来,整体的等分带宽要低些。

3.       集群由成百上千台机器组成,因此失败变得很普通

4.       存储是由连接到单个机器上的廉价的IDE硬盘提供的。一个内部开发的分布式文件系统被用来管理存储在硬盘上的数据。文件系统通过备份来为不可靠的硬件提供可用性和可靠性。

5.       用户提交job到调度系统。每个job由一组task组成,job被调度系统映射到集群中的一组可用机器集合上去执行。

3.1 执行概览

通过自动将输入数据划分为M个片段,使得Map调用可以跨越多个机器执行。这些输入片段可以被不同的机器并行处理。Reduce调用的分布,是通过使用一个划分函数(比如hash(key) mod R)将中间结果的key的值域空间划分为R个片段。片段的个数R以及划分函数都是由用户描述的。

图1展示了一个MapReduce操作在我们的实现中的整体流程。当用户程序调用MapReduce函数时,将会产生如下的动作序列(图中的标号与如下描述中的数字相对应):



1.       用户程序中的MapReduce库首先将输入文件切分为M个片段(每个片段大小通常是16MB到64MB,该大小用户可以通过一个配置参数控制)。然后在一组机器集上启动该程序的所有拷贝。

2.       在这些程序拷贝中有一个是特殊的:the master。其余的是称为worker,由master为它们分配任务。总共有M个map task和R个reduce task需要分配。Master选择空闲的worker,给它们每个分配一个map或者reduce task。

3.       被分配了map task的worker读取相应的输入片段内容。它从输入中解析出key/value对的集合,将每个对传递给用户定义的map函数处理。由map函数生成的中间结果被缓存在内存里。

4.       被缓存的那些对,通过划分函数被分成R个区域,然后周期性的被写入本地磁盘。然后将这些缓存对在本地磁盘上的位置返回给master,master再负责将这些位置信息传递给reduce worker。

5.       当一个reduce worker被master通知了这些位置信息后,它就使用RPC调用去map worker的本地磁盘里读取这些缓冲数据。当一个reduce worker已经读取了所有的缓冲数据后,它就将它们根据key值进行排序,以让具有相同key值的被组织在一块。排序是需要的因为通常很多不同的key值会被映射到同一个reduce task。如果中间结果的数据量太大以至于无法放入内存,就需要进行外排序。

6.       Reduce worker在排好序的中间结果数据上迭代,对于碰到的每个唯一的中间key值,它就将该key值,以及与它对应的value集合传递给用户定义的reduce函数。该reduce函数的输出会被append到这个reduce worker的最终输出文件上。

7.       当所有的map task和reduce task完成后,master唤醒用户程序。这时用户程序从MapReduce调用里返回。

成功完成后,MapReduce执行结果将会产生R个输出文件(一个reduce task对应一个,文件名由用户指定)。通常,用户不需要将这R个输出文件合并为一个文件,它们通常会作为另一个MapReduce调用的输入。或者通过另一个可以处理输入是划分为多个文件的分布式应用程序来使用它们。

3.2 master数据结构

Master保存了几个数据结构。对于每个map和reduce task,它会保存它们的状态(空闲,处理中,完成)以及worker所在机器的标识(针对非空闲task)。{注意task与work之间的关系,不是一对一的,一个worker可能处理多个task}

Master是将中间结果文件位置从map task传递到reduce task的渠道。因此对于每个完成的map task,master会保存由它产生的R个中间结果文件的大小及位置。当map task结束后,将会收到对于这些位置和大小信息的更新。这些信息又会被逐步推送给那些包含正在处理中的reduce task的worker。

3.3 容错

因为MapReduce库是设计用来帮助在成百上千台集群上处理大量数据的,所以这个库就必须能够优雅地容忍机器失败。

Worker失败

Master周期性的ping每个worker。如果在一定时间内没有收到某个worker的响应,就会把它标记为失败。由该worker执行完成的那些map task{注意reduce task不需要}都必须重置为空闲状态。这样,它们就又可以被调度到其他的worker上重新执行。类似的,那些该woker上执行中的任何map reduce task{注意reduce task也需要},必须重置为空闲状态,重新加入调度。

已经完成的map task需要重新执行,是因为它们的输出被存储在失败的那台机器的本地磁盘上,因此就变成了不可访问的。已经完成的reduce task不需要重新执行,是因为它们的输出存放在一个全局文件系统上。

当一个map task首先由work A执行,然后又有worker B执行(因为A失败了)。所有正在执行reduce task的worker都会收到该重新执行的通知。任何已经不能从worker A读数据的reduce task都将会从worker B读取。

MapReduce可以很有效的应对大规模的worker失败。比如,在一个MapReduce操作期间,在一个运行中的集群上的网络维护可能导致80台机器几分钟内同时无法访问。MapReduce master简单的重新执行那些不可达机器上的任务。继续推进整个计算过程,最后完成该MapReduce操作。

Master失败

很容易让master写入上面描述的数据结构的周期性检查点。如果master死掉后,就可以从上次的检查点状态开始启动一个新的拷贝。然而,由于只有一个master,而且它的失败也是不太可能的{很明显如果有多个master,那么失败的概率就会大大上升},因此在我们当前的实现中,如果master失败我们就结束MapReduce计算。在这种情况下,客户端可以进行检查,如果需要可以重试它们的MapReduce操作。

失败出现时语义

当用户提供的map和reduce函数,针对它们的输入是一个确定性函数时,我们的分布式实现应该与整个程序串行执行时产生相同的输出。

我们通过map和reduce task输出的原子性提交来实现这个属性。每个执行中的task将它们的输出写入私有的temp文件里。一个Reduce task产生一个这样的文件,一个map task产生R个这样的文件。当一个map task完成后,worker会给master发送一个包含这个R个temp文件名称的消息。如果master收到一个已经完成的map task的完成消息,它会忽略该消息。否则,它会将这个R个文件的名称记录在自己的一个数据结构中。

当reduce task完成后,reduce worker会自动把temp输出文件重命名为最终的输出文件。如果相同的reduce task在多个机器上执行,多个重命名操作将会在同一个最终输出文件上执行。我们依赖于底层文件系统提供的原子性的重命名操作来保证最终的文件系统中只会包含一个reduce task执行产生的数据。

map和reduce函数是确定性的以及等价于相同情况下串行执行结果的语义,主要优点是使得程序员可以很容易地理解程序的行为。当map和reduce操作不是确定性的时候,我们提供了虽然弱些但是仍然合理的语义。在出现不确定性操作时,一个特殊的reduce task R1的输出等价于该非确定性程序针对R1的串行执行输出。但是对于另一个reduce task R2,就可能对应另一个不同的非确定性程序的串行执行结果。

考虑map task M和reduce task R1和R2.让e(Ri)代表已经完成的Ri的执行过程。语义可能会变得更弱,因为e(R1)可能读取了M某次执行的输出结果,而e(R2)可能读取了M另一次执行的输出。{首先M是不确定性的,其次M可能被重新执行过,这样R1 R2虽然读的是同一个task的输出,但是可能读取了不同的输出结果}。

3.4 局部性

在我们的计算环境中,网络带宽是相对宝贵的。我们通过利用输入数据(由GFS管理)是存储在机器的本地磁盘上的这一事实来节省网络带宽。GFS将每个文件划分为64MB大小的块,每个块的几个副本存储在不同的机器上。MapReduce master充分考虑输入文件的位置信息,尽量将一个map task调度到包含相应的输入数据副本的那个机器上。如果不行,就尝试将map task调度到该task的输入数据附近的那些机器(比如让worker所在的机器与包含该数据的机器在同一个网络交换机上)。当在一个集群上运行一个具有很多worker的大型MapReduce操作时,大部分的输入数据都是从本地读取的,很少消耗网络带宽。

3.5 任务粒度

如上所述,我们将map阶段划分为M个片段,将reduce阶段划分为R个片段。理想情况下,M和R都应当远远大于运行worker的机器数目。让每个worker执行很多不同的task可以提高动态负载平衡,也能加速worker失败后的恢复过程:它已经完成的很多map task可以传给所有其他机器。在我们的实现中M和R到底可以多大,有一些实际的限制。因为master必须进行O(M+R)个调度决定以及在内存中保存O(M*R)个状态{即每个map task的R个输出文件的位置信息,总共M个task,所以是M*R}。 (但是关于内存使用的常数因子是很小的:O(M*R)个状态大概由每个map task/reduce 对的一字节数据组成)。

另外,R通常会被用户限制,因为每个reduce task的输出在不同的输出文件中。在实际中,我们通常这样选择M:使每个独立task输入数据限制在16MB到64MB之间(这样上面所说的本地化优化是最有效的)。我们让R大概是我们将要使用的worker机器的几倍。我们通常这样执行MapReduce操作,在有2000个worker机器时,让M = 20000,R = 5000。

3.6 备份任务

一个影响MapReduce操作整体执行时间的最常见的因素是”掉队者”(花费相当长时间去完成MapReduce操作中最后剩下的极少数的那几个task的那台机器)。有很多原因可以导致掉队者的出现。比如:具有一块坏硬盘的机器可能会经历频繁的可修正错误而使得IO性能从30MB/s降低到1MB/s。集群调度系统可能会将那些引发CPU 内存 本地磁盘或者网络带宽等资源竞争的task调度到同一台机器上。我们最近见过的一个错误是由于机器初始化代码中的一个bug引起的处理器缓冲失灵,使得受影响的机器上的计算性能降低了一百倍。

我们有一个可以缓解这种掉队者问题的通用机制。当MapReduce操作接近尾声的时候,master会备份那些还在执行中的task。只要该task的主本或者其中的一个副本完成了,我们就认为它完成了。通过采用这种机制,我们只使计算资源的利用率增长了仅仅几个百分点,但是明显地降低了完成整个MapReduce操作所需的时间。比如,在5.3节描述的排序例子中,如果不启用这个机制,整个完成时间将会增长53%。

4.概念
尽管通过简单书写map和reduce函数提供的基本功能对于我们大部分的应用来说足够了,我们也发现了其中的一些扩展也很有用。这一节,我们就来描述下它们。

4.1 划分函数

MapReduce用户指定他们期望的reduce task(也可以说输出文件)的数目R。任务产生的数据通过在中间结果的key上使用一个划分函数被划分开。系统提供一个使用hash的默认的划分函数(比如 “hash(key) mod R”)。然而在某些情况下,使用关于key的其他函数进行划分更有用。比如有时候输入是URL,我们希望来自相同host的输入可以存放在相同的输出文件上。为了支持这种情况,MapReduce库的用户必须提供一个特殊的划分函数。比如使用”hash(Hostname(urlkey)) mod R”作为划分函数,就可以让来自相同host的所有URL落在同一个输出文件上。

4.2 排序保证

我们保证在一个给定的划分内,作为中间结果的key/value对是按照key值的增序进行处理的。这种有序化保证可以让每个划分的输出文件也是有序的。而这在输出文件格式需要支持按照key的有效的随机查找时非常有用,或者输出用户也会发现让对这些数据进行排序会很方便。

4.3 合并函数

某些情况下,map task产生的中间结果有很多具有相同key的重复值,而且用户指定的reduce函数又满足交换率和结合率。一个很好的例子就是2.1节里描述的wordcount的例子。因为单词频率的分布倾向于遵循Zipf分布,每个map task将会产生成百上千个相同的记录比如<the,1>这样的。而所有的这些又将会通过网络传递给一个reduce task,然后通过reduce函数将它们累加起来。我们允许用户描述一个combiner函数,在数据通过网络发送之前对它们进行部分的归并。

Combiner函数在每个执行map task的机器上这些。通常用来实现combiner和reduce函数的代码是相同的。唯一的不同在MapReduce库如何处理它们的输出。一个reduce函数的输出将会被写到最终的输出文件,而combiner函数的输出会被写到一个将要发送给reduce task的中间结果文件中。

4.4 输入和输出类型

MapReduce库提供了几种不同格式的输入数据支持。比如”text”输入模式:将每一行看做一个key/value对,key是该行的offset,value是该行的内容。另一个支持的模式是一个根据key排序的key/value对的序列。每个输入类型知道如何将它们自己通过有意义的边界划分,然后交给独立的map task处理(比如text模式,会保证划分只会发生在行边界上)。用户可以通过提供一个reader接口的实现来支持新的输入类型。对于大多数用户来说,仅仅使用那些预定义的输入类型就够用了。

一个reader并不是必须从文件读数据。比如可以简单的定义一个从数据库或者是内存中的数据结构中读记录的reader。

与之类似,我们也提供一组输出类型用于控制输出数据格式,同时用户也很容易添加对于新的输出类型的支持。

4.5 副作用

MapReduce的用户发现某些情况下,在map和reduce操作中顺便产生一个文件作为额外的输出会很方便。这些的副作用是原子性以及幂等性依赖于应用程序编写者。通常应用程序编写者会写一个temp文件,一旦它已经生成完毕再将它原子性的重命名。

我们并不为单个task产生的多个输出文件提供原子性的两阶段提交。因此那些具有跨文件一致性需求的产生多个输出文件的task应当是确定性的。这个限制在实际中还没有引起什么问题。

4.6 跳过坏记录

有时候用户代码中的一些bug会导致Map或者Reduce函数在处理某个特定记录时一定会crash。这样的bug会使得MapReduce操作无法车成功完成。通常的处理方法是修复这个bug,但是有时候这样做显得并不灵活。因为bug可能是存在于第三方的库里,但是源代码是不可用的。而且有时候忽略一些记录是可以接受的,比如在一个大数据集上进行统计分析时。我们提供了一种可选的执行模式,在该模式下,MapReduce库会检测那些记录引发了该crash,然后跳过它们继续前进。

每个worker进程安装了一个信号处理器捕获那些段错误和总线错误。在调用用户Map或者Reduce操作之前,MapReduce库使用一个全局变量存储该参数的序列号。如果用户代码产生了一个信号,信号处理器就会发送一个包含该序列号的”last gasp”的UDP包给master。当master发现在同一记录上发生了不止一次失败后,当它在相应的Map或者Reduce task重新执行时,它就会指出该记录应该被跳过。

4.7 本地化执行

在Map和Reduce函数上进行调试会变得很有技巧,因为实际的计算发生在分布式系统上,通常是几百台机器,而且工作分配是有master动态决定的。为了降低debug,profile的难度以及进行小规模测试,我们开发了一个MapReduce库的变更实现,让MapReduce操作的所有工作在本地计算机上可以串行执行。用户可以控制将计算放在特殊的map task上执行。用户通过使用一个特殊的flag调用它们的程序,然后就可以简单的使用他们的调试和测试工具(比如gdb)。

4.8 状态信息

Master运行一个内部的http服务器,然后发布一些用户可以查看的状态页面。这些状态页面展示了计算的进度,比如已经有多少任务完成,多少还在执行中,输入字节数,中间数据的字节数,输出的字节数,处理速率等等。该页面也会包含指向每个task的标准错误和标准输出文件的链接。用户可以使用这些数据来预测该计算还要花费多少时间,是否还需要为该计算添加更多的资源。计算远远低于预取时,这些页面也可以用来发现这些情况。

另外,更高级别的状态页会展示那些worker失败了,当它们失败时在处理哪些map和reduce task。在诊断用户代码中的bug时,这些信息都是很有用的。

4.9 计数器

MapReduce库提供了一些计数器设施来计算各种事件的发生。比如用户代码可能想计算处理的单词的总数,或者被索引的德语文档的个数等等。

为了使用这些设施,用户代码需要创建一个命名计数器对象然后在Map 和/或 Reduce函数中累加这些计数器。比如:

Counter* uppercase;

uppercase = GetCounter("uppercase");

map(String name, String contents):

for each word w in contents:

if (IsCapitalized(w)):

uppercase->Increment();

EmitIntermediate(w, "1");

来自独立worker机器的计数器的值将会周期性的发送给master(通过对master的ping的响应捎带过去)。Master将那些成功的map和reduce task的计数器值聚集,当MapReduce操作结束后,将它们返回给用户代码。当前的计数器值也会在master的状态页面上显示出来,这样用户就可以看到计算的实时进展。在计算计数器值时,master会忽略掉那些重复执行的相同map或者reduce task的值,以避免重复计数。(重复执行可能是由于备份任务的使用或者是task失败引发的重新执行而引起的。)

一些计数器值是由MapReduce库自动维护的,比如已经处理的输入key/vaule 对的个数,已经产生的输出key/vaule 对的个数。

用户发现计数器设施对于MapReduce操作的行为的完整性检查是非常有用的。比如,在某些MapReduce操作中,用户代码可能想确定已产生的输出对的数目是否刚好等于已处理的输入对数目,或者已经被处理的德语文档在已处理的文档中是否在一个合理的比例上。

5.性能
在本节中我们将通过运行在大集群的机器上的两个计算来测量MapReduce的性能。一个计算在大概1TB的数据中搜索给定模式的文本。另一个计算对接近1T的数据进行排序。

这两个程序就可以代表MapReduce用户所写的实际程序中的大部分子集:一类是将数据从一种表现形式转换为另一种表现形式的程序,另一类就是从一个大数据集合中抽取少量感兴趣的数据集。

5.1   集群配置

所有的程序都是在一个由将近1800台机器组成的集群上执行。每台机器有2个打开了超线程的2G Intel Xeon处理器,4GB内存,2个160GB IDE硬盘,一个gigabit 以太网链路。这些机器安排在一个两级的树形交换网络上,根节点具有接近100-200 Gbps的总体带宽。所有机器具有相同的配置,因此在任意两个机器间的往返时间小于1ms。

在4GB内存中,大概1-1.5G内存预留给在集群上运行的其他task。程序在一个周末的下午执行,此时cpu 硬盘 网络接近空闲。

5.2   Grep

Grep程序通过扫描10^10个100字节的记录,查找一个很少出现的三字符模式(该模式出现在92337个记录里)。输入被划分为近似64MB大小的片段(M=15000),整个输出被放在一个文件中(R=1)。



图2展示了整个计算的处理过程。Y轴表示输入数据的扫描速率。伴随这安排用于进行该MapReduce操作的机器数的增多,该速率也在逐渐攀升,当有1764个worker被分配该任务后达到了30GB/s的峰值。当map task结束后,该速率开始下降,大概在80秒的时候基本上降为0。整个计算过程花费了接近150秒,这包括一分钟的启动时间(这个开销主要是由将程序传输给所有worker,与GFS交互以打开1000个输入文件以及得到本地化优化所需要的信息造成的)。

5.3   排序

排序程序对10^10个100字节的记录进行排序(接近1TB数据)。这个程序根据TeraSort Benchmark进行了建模。

排序程序总共由不到50行用户代码组成。Map函数只有3行,将10字节长的排序用key值从一个文本行中抽取出来,然后输出该key,以及原始的文本行,作为中间结果key/value对。我们使用内建的Identity函数作为reduce操作。该函数将中间结果不过任何改变地输出。最后的排好序的结果写到一个具有2个副本的GFS文件集合上(即该程序将会产生2TB的输出)。

与之前的类似,输入数据被划分为64MB的片段(M=15000)。排好序的输出被划分为4000个输出(R=4000)。划分函数使用key的字节表示来将它们划分为R个片段。

对于该benchmark的划分函数建立在对于key值分布的了解上。对于一个通常的排序问题里,我们会增加一个预先进行的MapReduce操作,该操作会收集key值的采样值,然后使用这些key值的采样来计算最终排序序列的划分点。



图3(a)展示了该排序程序的一个正常的处理过程。左上角的图表示输入速率。峰值速率大概是13GB/s,由于所有的map task在200秒前都结束了,所以该速率下降的很快。可以看到,输入速率要小于grep。这是因为排序的map task花费了大概一半的时间和IO带宽将中间结果写入到本地硬盘上。而与之相比,grep的中间结果输出几乎可以忽略不及。

左边中间的图展示了数据从Map task向reduce task的网络传输速率。当第一个map task完成后,shuffling就开始了。图中的第一个峰值是由于第一批的1700个reduce task都启动后产生的(整个MapReduce操作被分配给大概大概1700个机器,每个机器同一时刻最多执行一个Reduce task)。大体上在300秒的时候,这一批的reduce task结束,然后启动了剩余的reduce task的shffling过程。大概在600秒时,这些shuffling才结束。

左边最底下的图形展示了reduce task将排好序的数据写入最终输出文件的速率。在第一次的shuffling的结束与数据写入开始之间存在一个延时,是因为机器此时正忙着对中间数据进行排序。写入过程在一段时间内大概持续着大概2-4GB/s的速率。所有的写入大概在850秒的时候结束。假设启动的花费,整个计算花费了891秒。这接近于当前Terasort benchmark的最快结果1057秒。

另外需要指出的是:输入速率比shuffle速率和输出速率高是因为我们的本地化优化(大部分的数据都是从本地硬盘读取的,这就绕过了网络带宽的限制)。Shuffle速率比输出速率高是因为输出阶段要写两份拷贝(保存两份是为了可靠性和可用性){这两份拷贝是需要耗费网络带宽的}。写两个副本是因为这是我们的底层文件系统提供的可靠性和可用性机制。如果底层文件系统使用了erasure code而不是副本,对于写数据的网络带宽需求将会减少。

5.4   任务备份的影响

在图3(b),我们展示了一个没有开启任务备份的排序程序的执行过程。执行流类似与我们在图3(a)里看到的那样。除了在繁重的写活动出现后出现了一个长尾。在960秒时,只剩下5个reduce task还没有完成。然而这些掉队者,在300秒后才完成。整个计算花费了1283秒,增加了44%。

5.5   机器失败

图3(3),展示了我们在计算执行几分钟后,杀掉1746个worker里面的200个后的执行过程。底层的集群调度器,立刻重启在这些机器上的worker进程(因为只是进程被杀掉了,机器仍然是可用的)。

死掉的worker作为负的输入速率进行显示,因为前面以及完成的map task的工作都消失了需要重新执行。Map task的重新执行相对较快。加上启动时间,整个计算过程在933秒的时候结束,仅仅比正常情况下的执行时间增加了5%。

6.经验
在2003年2月,我们写出了第一版的MapReduce库,2003年8月对它进行了很多包括本地化优化,跨机器的任务执行的动态负载平衡等等在内的改进。从那时起,我们欣喜的发现MapReduce库可以如此广泛地应用在我们工作中的各种问题上。目前它已经在Google内部应用在广泛的领域上:

大规模机器学习问题

用于Google新闻和购物的聚类问题

找到最流行的查询词

为了实验或者产品从网页中抽取属性(比如为了本地化搜索从大量网页中抽取地理位置)

大规模图形计算



图4展示了过去的时间里,提交到我们的源代码管理系统中的MapReduce程序的数目。从2003年的0到2004年9月接近900个。MapReduce之所以如此成功,是因为它使得在半小时内写出一个简单地可以在数千台机器上跑的程序成为可能。这大大加速了我们的开发和原型周期。此外,它还使得没有分布式或者并行系统编程经验的程序员可以很容易地使用大量的计算资源。



在每个job结束时,MapReduce库还会记录该job使用的计算资源的统计信息。表1,我们展示了2004年8月,在Google内部运行MapReduce job的一个子集的一些统计信息。

6.1   大规模索引

目前为止,我们一个最重要的MapReduce应用就是用它完全重写了产品索引系统,该系统为Google的网页搜索服务产生所需要的数据结果。索引系统以一个由爬虫抓取的存储在GFS上的很大的文档集合作为输入,总共数据量要超过20TB。索引流程由5到10个MapReduce操作组成。通过使用MapReduce(而不是使用之前版本的索引系统所使用的自适应的分布式传输)有如下几个优点:

索引代码很简单,少而且容易理解。因为用于容错,分布和并行化的代码都隐藏在了MapReduce库中。比如,我们通过使用MapReduce将原来的一个计算过程的代码量从3800行降低到了700行。

MapReduce库的性能以及足够好了,这样我们就能将不相关地计算分离,而不是为了降低额外的传输费用而将它们合在一块。这使得我们很容易改变索引处理过程。比如过去在旧系统中可能需要几个月才能完成的变更,现在在新的系统中几天就可以完成。

索引处理流程变得很容易操作。因为大部分由于机器失败,慢机器以及网络引发的问题都由MapReduce库自动处理掉了,不需要进行额外的干预。另外也很容易通过给索引系统增加新机器来提高性能。

7.相关工作
已经有很多系统提供了严格的编程模型,使用了很多限制来进行计算的并行化。MapReduce模型可以看做是基于我们的在现实中的海量计算经验,对这些模型的一个简化和提炼。更重要的是,我们提供了一个可以扩展到数千个处理器上的容错实现。与之相比,大部分的并行处理系统只是在小规模集群上实现的,将机器错误处理交给程序员。

大同步模型和一些MPI实现为简化程序员编写并行程序提供了更高级别的抽象。这些系统与MapReduce的一个关键不同就是MapReduce使用了一个限制性的编程模型来为用户程序提供自动地并行化和透明的容错机制。

我们的本地化优化策略主要源于这样的一些技术,比如active disks,在那里为了降低IO或者网络的数据传输,计算被放到那些靠近本地硬盘的处理元素中执行。我们是在由少量硬盘直接连接的PC上运行而不是在一个磁盘控制处理器上运行,但是策略是类似的。

我们的任务备份机制类似于Charlotte系统中使用的eager调度机制。简单eager调度机制的一个缺点是如果给定的task引发了重复的失败,整个计算就无法完成。我们通过跳过坏记录的方式解决了这样的问题。

MapReduce实现依赖于内部开发的一个集群管理系统,它负责在一个机器集合上分布调度用户任务。尽管不是本文关注的重点,该集群管理系统类似于Condor。

作为MapReduce库的一部分的排序设施在操作过程上类似于Now-sort。源机器(map task)将数据划分进行排序,然后将每份传递给一个R个reduce worker中的一个。每个reduce worker在本地进行排序(如果可能的话就仅使用内存排序)。当然NOW-sort并不包含使得我们的库应用广泛的Map和Reduce函数。

Rive提供了一个进程间通过分布式队列进行数据传输的编程模型。像MapReduce一样,River尽量提高系统的平均性能,即使是由于硬件异构或者系统扰动出现了非对称的情况。River通过仔细的硬盘和网络传输调度来达到平衡的完成时间。MapReduce使用了不同的策略。通过限制编程模型,MapReduce框架能将问题划分为大量的细粒度task。这些task可以在可用的worker上进行动态的调度,这样跑的快的worker就可以处理更多的task。该编程模型也允许我们在job快结束的时候调度task进行冗余的执行,这样大大减少了非对称出现时的完成时间。

BAD-FS有一个与MapReduce完全不同的编程模型。与MapReduce不同,它的目标是降低在广域网上的job的执行时间。但是,它们具有两个基本的相同点:1.都采用了冗余执行从失败导致数据丢失中快速恢复 2.都采用了本地化优化以降低数据在网络上的传输。

TACC是一个设计用户简化构建高可用网络服务的系统。与MapReduce类似,它依赖于重新执行作为实现容错的一个机制。

8.总结
MapReduce编程模型已经因各种目的在Google内部成功使用。我们将这种成功归为几个原因。首先,模型很容易使用,即使对于没有分布式编程经验的程序员来说也是,因为它隐藏了并行化,容错,本地化优化,负载平衡的细节。第二,大量的问题可以简单地用MapReduce计算来表达。比如MapReduce被用来为Google的网页搜索服务,排序,数据挖掘,机器学习很多其他的系统生成数据。第三,我们开发了一个可以扩展到数千台机器上MapReduce实现。该实现可以充分利用机器的资源,因此很适合用来处理在Google碰到的很多大规模计算问题。

通过这项工作我们也学到了很多。首先,通过限制编程模型可以使计算的并行化和分布很简单,同时也能让它容错。第二,网络带宽是一种稀缺资源。我们系统中大量的优化都是为了降低网络传输数据量:本地化优化允许我们从本地磁盘上读数据,将单份拷贝的中间数据写入本地磁盘节省了网络带宽。第三,冗余执行能用来降低慢机子的影响,以及用来处理机器失败和数据丢失。







欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2