MapReduce中的GroupingComparator应用案例
在日常的数据统计分析中,常常会有类似如下的求分组最大值统计需求,用到的数据示例如下:
itemid amount date …
10001 136.6 2015-1-12 …
10001 165.5 2015-1-12 …
10002 122.5 2015-1-12 …
10002 166.88 2015-1-12 …
10003 189.65 2015-1-12 …
10003 198.62 2015-1-13 …
10001 278.6 2015-1-13 …
10001 143.6 2015-1-13 …
需求是求出整个数据集中每一种商品销售额最大的单笔订单,结果如下:
10001 278.60
10002 166.88
10003 198.62
... ...
如果用传统sql来求解,这是极其简单的:
select itemid,max(amount) from t_order group by itemid;
而用mapreduce程序,该如何实现呢?最简单的办法是:
1、在mapper中将日志的每一行解析成键值对: “key: itemid ,value:amount”
2、经过shuffle之后,相同itemid的数据会发送给同一个reducer
3、然后,我们就可以在reducer中遍历某个item的一组values,
4、这一组values对于amount来说是无序的,进而需要在reducer中缓存这一组values,然后排序从而取到这一组values中的最大值。
这个办法固然可行,但是效率不是很高,因为在reducer中针对一组values取最大amount,需要在内存中进行缓存并排序,在数据量大的情况下,会耗费相当多的内存空间和cpu运算资源,甚至可能会内存溢出。
现在,就让我们来思考另一种实现方式,如果能让数据到达reducer时的次序是针对amount的倒序,则我们可以直接取改组values的第一个值即可,如何实现呢?
1、首先,我们构造一个bean<itemid,amount> implements WritableComparable作为mapper输出的key来传递数据,在其compareTo()方法中定义逻辑:按照itemid升序及amount降序,这样一来,mapper输出的数据就会按照amount降序排列,示例如下:
<10001,278.60>
<10001,165.50>
<10001,136.60>
<10002,166.88>
<10002,122.5>
.......
2、但是,这样一来,又带来一个棘手的问题——相同item的bean在shuffle时不一定发往同一个reducer!因为每一个bean(就算是相同itemid)都是一个不同的对象,而默认HashPartitioner分区的逻辑是用bean的hashcode计算分区号。从而,需要自定义一个ItemPartitioner,实现将相同itemid的bean发往同一个reducer,代码如下所示:
class ItemPartitioner extends Partitioner{
int getPartition(bean,numreducertasks){
return bean.getItemid.hashCode() % numreducertasks;
}
}
这样一来,可以保证相同item的数据会到达同一个reducer,并且是按照amount降序排序,如下所示:
<10001,278.60>
<10001,165.50>
<10001,136.60>
.......
3、接下来,就是如何取到这一组values中的最大值。
在默认情况下,reducer会将拿到的数据按照相同key进行聚合,然后对聚合起来的每一组数据调用一次reduce方法,此处麻烦的问题是,这里的每一个key都是一个对象,从而,就算是相同itemid的数据,也不会聚合到一组,而是会逐一地调用reduce()方法进行处理,这样一来,我们也就没办法取到最大值了;
4、要解决这个问题,就得借助GroupingComparator了,其工作机制是这样:
当mapper输出的相同partition的kv数据到达一个Reducer后,会有一个聚合的过程,即将“相同”key的kv聚合到一起(其实质是利用GroupingComparator来对key进行比较),然后将这一组聚合好的kv中最前面的一个kv的key传给reduce方法的入参key,将一个用来遍历这一组kv数据的values的迭代器iterator传给reduce方法的入参iterator。
5、从而,我们可以自定义一个GroupingComparator来定义哪些kv可以聚合成一组,代码示例如下:
public class GroupingComparator extends WritableComparator{
protected GroupingComparator() {
super(Bean.class,true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
Bean kv1 = (Bean) a;
Bean kv2 = (Bean) b;
int cmp = kv1.getItemid().compareTo(kv2.getItemid());
return cmp;
}
}
6、这样一来,虽然不同的bean是不同的对象,但是在进行聚合的时候,根据GroupingComparator ,只要是itemid相同的bean都会算成一组聚合kv,然后这一组聚合kv的最前面一个kv(也就是amount值最大的那一个)会传入reduce方法的入参key,从而,在我们的reduce方法中,只要直接输出这个key就ok了:
@Override
protected void reduce(Bean bean,Iterable<NullWritable> arg1,Context context)throws IOException, InterruptedException {
context.write(bean, NullWritable.get());
}
当然,要想让这个GroupingComparator 生效,还需要在job中进行注册:
job.setGroupingComparatorClass(GroupingComparator.class);
综上所述,该案例需要自定义这几个元素:
自定义的复合key
自定义的partitioner
自定义的GroupingComparator
|
|