|
为什么要MapReduce 1)海量数据在单机上处理因为硬件资源限制,无法胜任 2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度 3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。 4)mapreduce分布式方案考虑的问题 (1)运算逻辑要不要先分后合? (2)程序如何分配运算任务(切片)? (3)两阶段的程序如何启动?如何协调? (4)整个程序运行过程中的监控?容错?重试? 分布式方案需要考虑很多问题,但是我们可以将分布式程序中的公共功能封装成框架,让开发人员将精力集中于业务逻辑上。而mapreduce就是这样一个分布式程序的通用框架。 MapReduce进程一个完整的mapreduce程序在分布式运行时有三类实例进程: 1)MrAppMaster:负责整个程序的过程调度及状态协调 2)MapTask:负责map阶段的整个数据处理流程
3)ReduceTask:负责reduce阶段的整个数据处理流程
MapReduce程序运行流程分析
1)在MapReduce程序读取文件的输入目录上存放相应的文件。 2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。 3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。 4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。 5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。 6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算 7)map()运算完毕后将KV对收集到maptask缓存。 8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件 9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。 10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。 11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。
文字描述完毕,接着就上代码把。本人参照尚硅谷的文档做了一个简单案例。
FlowBean类:package com.pinyougou.Hadoop;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* FlowBean
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long totalFlow;
@Override
public int compareTo(FlowBean o) {
return this.totalFlow > o.totalFlow ? -1 : 1;
}
public FlowBean() {
}
public FlowBean(long upFlow, long downFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.totalFlow = upFlow + downFlow;
}
public long getUpFlow() {
return upFlow;
}
public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
this.totalFlow = this.upFlow + this.downFlow;
}
public long getDownFlow() {
return downFlow;
}
public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
this.totalFlow = this.upFlow + this.downFlow;
}
@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + totalFlow;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(totalFlow);
}
@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.totalFlow = in.readLong();
}
}
FlowDriver类:package com.pinyougou.Hadoop;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* FlowDriver
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowDriver {
public static void main(String[] args) throws Exception {
// System.setProperty("hadoop.home.dir","E:/czbk/hadoop-2.7.4/bin" );
// 获取配置信息
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 设置jar的路径
job.setJarByClass(FlowDriver.class);
// 关联Mapper和Reducer的类
job.setMapperClass(FlowMapper.class);
job.setReducerClass(FlowReducer.class);
// 设置Mapper阶段的输出
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);
// 设置Reducer阶段的输出
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);
// 设置输入输出数据的路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 设置分区
// job.setPartitionerClass(FlowPartitioner.class);
// job.setNumReduceTasks(5);
// 提交
boolean result = job.waitForCompletion(true);
System.out.println(result ? "success!" : "fail!");
System.exit(result ? 0 : 1);
}
}
FlowMapper类:package com.pinyougou.Hadoop;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* FlowMapper
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
private Text outputKey = new Text();
private FlowBean flowBean = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\\s+");
String phoneNum = fields[1];
outputKey.set(phoneNum);
long uploadFlow = Long.parseLong(fields[fields.length - 3]);
long downFlow = Long.parseLong(fields[fields.length - 2]);
flowBean.setUpFlow(uploadFlow);
flowBean.setDownFlow(downFlow);
context.write(outputKey, flowBean);
}
}
FlowPartitioner类:package com.pinyougou.Hadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* FlowPartitioner
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowPartitioner extends Partitioner<Text, FlowBean> {
@Override
public int getPartition(Text key, FlowBean flowBean, int reducerNum) {
String telePhoneNum = key.toString();
String area = telePhoneNum.substring(0, 3);
int partition = 4;
if ("136".equals(area)) {
partition = 0;
} else if ("137".equals(area)) {
partition = 1;
} else if ("138".equals(area)) {
partition = 2;
} else if ("139".equals(area)) {
partition = 3;
}
return partition;
}
}
FlowReducer类:package com.pinyougou.Hadoop;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* FlowReducer
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
private FlowBean outputValue = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
long total_upflow = 0L;
long total_downnflow = 0L;
for (FlowBean flowBean : values) {
total_upflow += flowBean.getUpFlow();
total_downnflow += flowBean.getDownFlow();
}
outputValue.setUpFlow(total_upflow);
outputValue.setDownFlow(total_downnflow);
context.write(key, outputValue);
}
}
运行main方法时 携带args[]参数指定输出和输入路径,在作者本机地址是输入路径G:\phone1和输出路径G:\phone2 G:\phone1\phone_data.txt的文件内容为 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 4 0 264 0 200 1363157991076 13926435656 20-10-7A-28-CC-0A:CMCC 120.196.100.99 2 4 132 1512 200 1363154400022 13926251106 5C-0E-8B-8B-B1-50:CMCC 120.197.40.4 4 0 240 0 200 1363157993044 18211575961 94-71-AC-CD-E6-18:CMCC-EASY 120.196.100.99 iface.qiyi.com 视频网站 15 12 1527 2106 200 1363157995074 84138413 5C-0E-8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200 1363157995033 15920133257 5C-0E-8B-C7-BA-20:CMCC 120.197.40.4 sug.so.360.cn 信息安全 20 20 3156 2936 200 1363157983019 13719199419 68-A1-B7-03-07-B1:CMCC-EASY 120.196.100.82 4 0 240 0 200 1363157984041 13660577991 5C-0E-8B-92-5C-20:CMCC-EASY 120.197.40.4 s19.cnzz.com 站点统计 24 9 6960 690 200 1363157973098 15013685858 5C-0E-8B-C7-F7-90:CMCC 120.197.40.4 rank.ie.sogou.com 搜索引擎 28 27 3659 3538 200 1363157986029 15989002119 E8-99-C4-4E-93-E0:CMCC-EASY 120.196.100.99 www.umeng.com 站点统计 3 3 1938 180 200 1363157992093 13560439658 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 15 9 918 4938 200 1363157986041 13480253104 5C-0E-8B-C7-FC-80:CMCC-EASY 120.197.40.4 3 3 180 180 200 1363157984040 13602846565 5C-0E-8B-8B-B6-00:CMCC 120.197.40.4 2052.flash2-http.qq.com 综合门户 15 12 1938 2910 200 1363157995093 13922314466 00-FD-07-A2-EC-BA:CMCC 120.196.100.82 img.qfc.cn 12 12 3008 3720 200 1363157982040 13502468823 5C-0A-5B-6A-0B-D4:CMCC-EASY 120.196.100.99 y0.ifengimg.com 综合门户 57 102 7335 110349 200 1363157986072 18320173382 84-25-DB-4F-10-1A:CMCC-EASY 120.196.100.99 input.shouji.sogou.com 搜索引擎 21 18 9531 2412 200 1363157990043 13925057413 00-1F-64-E1-E6-9A:CMCC 120.196.100.55 t3.baidu.com 搜索引擎 69 63 11058 48243 200 1363157988072 13760778710 00-FD-07-A4-7B-08:CMCC 120.196.100.82 2 2 120 120 200 1363157985066 13726238888 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-DE-D9:CMCC 120.196.100.99 18 15 1116 954 200
输出文件G:\phone2\part-r-00000 内容为
13480253104 180 180 360 13502468823 7335 110349 117684 13560436666 1116 954 2070 13560439658 2034 5892 7926 13602846565 1938 2910 4848 13660577991 6960 690 7650 13719199419 240 0 240 13726230503 2481 24681 27162 13726238888 2481 24681 27162 13760778710 120 120 240 13826544101 264 0 264 13922314466 3008 3720 6728 13925057413 11058 48243 59301 13926251106 240 0 240 13926435656 132 1512 1644 15013685858 3659 3538 7197 15920133257 3156 2936 6092 15989002119 1938 180 2118 18211575961 1527 2106 3633 18320173382 9531 2412 11943 84138413 4116 1432 5548
|