【郑州校区】大数据离线阶段Day4之Mapreduce的排序初步
1. 需求在得出统计每一个用户(手机号)所耗费的总上行流量、下行流量,总流量结果的基础之上再加一个需求:将统计结果按照总流量倒序排序。 2. 分析基本思路: 实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输 MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key。所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写key的compareTo方法。 3. 实现自定义的bean [AppleScript] 纯文本查看 复制代码 public class FlowBean implements WritableComparable<FlowBean>{
private long upFlow;
private long downFlow;
private long sumFlow;
//这里反序列的时候会用到
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow, long sumFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
public void set(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = upFlow+downFlow;
}
@Override
public String toString() {
return upFlow+"\t"+downFlow+"\t"+sumFlow;
}
/这里是序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}
/这里是反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
//注意反序列化的顺序跟序列化的顺序一致
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.sumFlow = in.readLong();
}
//这里进行bean的自定义比较大小
@Override
public int compareTo(FlowBean o) {
//实现按照 sumflow 的大小倒序排序
return this.sumFlow>o.getSumFlow()?-1:1;
}
} |
[AppleScript] 纯文本查看 复制代码 public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
Text k = new Text();
FlowBean v = new FlowBean();
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String phoneNum = fields[1];
long upFlow = Long.parseLong(fields[fields.length-3]);
long downFlow = Long.parseLong(fields[fields.length-2]);
k.set(phoneNum);
v.set(upFlow, downFlow);
context.write(k, v);
}
}
public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
FlowBean v = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context)
throws IOException, InterruptedException {
long upFlowCount = 0;
long downFlowCount = 0;
for (FlowBean bean : values) {
upFlowCount += bean.getUpFlow();
downFlowCount += bean.getDownFlow();
}
v.set(upFlowCount, downFlowCount);
context.write(key, v);
}
} |
传智播客·黑马程序员郑州校区地址 河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
|