黑马程序员技术交流社区

标题: 【广州校区】+【原创】+ MapReduce程序运行流程分析 [打印本页]

作者: 小李哥    时间: 2018-10-11 10:53
标题: 【广州校区】+【原创】+ MapReduce程序运行流程分析
为什么要MapReduce
1)海量数据在单机上处理因为硬件资源限制,无法胜任
2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
4)mapreduce分布式方案考虑的问题
(1)运算逻辑要不要先分后合?
(2)程序如何分配运算任务(切片)?
(3)两阶段的程序如何启动?如何协调?
(4)整个程序运行过程中的监控?容错?重试?
分布式方案需要考虑很多问题,但是我们可以将分布式程序中的公共功能封装成框架,让开发人员将精力集中于业务逻辑上。而mapreduce就是这样一个分布式程序的通用框架。
MapReduce进程
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调
2)MapTask:负责map阶段的整个数据处理流程

3)ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce程序运行流程分析

1)在MapReduce程序读取文件的输入目录上存放相应的文件。
2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算
7)map()运算完毕后将KV对收集到maptask缓存。
8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件
9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。


文字描述完毕,接着就上代码把。本人参照尚硅谷的文档做了一个简单案例。

FlowBean类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* FlowBean
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long totalFlow;

    @Override
    public int compareTo(FlowBean o) {
        return this.totalFlow > o.totalFlow ? -1 : 1;
    }

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
        this.totalFlow = this.upFlow + this.downFlow;
    }



    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
        this.totalFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
}
FlowDriver类:package com.pinyougou.Hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* FlowDriver
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowDriver {
    public static void main(String[] args) throws Exception {
     //   System.setProperty("hadoop.home.dir","E:/czbk/hadoop-2.7.4/bin" );

        // 获取配置信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 设置jar的路径
        job.setJarByClass(FlowDriver.class);
        // 关联Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 设置Mapper阶段的输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 设置Reducer阶段的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 设置输入输出数据的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 设置分区
        // job.setPartitionerClass(FlowPartitioner.class);
        // job.setNumReduceTasks(5);

        // 提交
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? "success!" : "fail!");
        System.exit(result ? 0 : 1);
    }
}

FlowMapper类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
* FlowMapper
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outputKey = new Text();
    private FlowBean flowBean = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\\s+");

        String phoneNum = fields[1];
        outputKey.set(phoneNum);

        long uploadFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        flowBean.setUpFlow(uploadFlow);
        flowBean.setDownFlow(downFlow);

        context.write(outputKey, flowBean);
    }
}

FlowPartitioner类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* FlowPartitioner
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean flowBean, int reducerNum) {
        String telePhoneNum = key.toString();
        String area = telePhoneNum.substring(0, 3);
        int partition = 4;
        if ("136".equals(area)) {
            partition = 0;
        } else if ("137".equals(area)) {
            partition = 1;
        } else if ("138".equals(area)) {
            partition = 2;
        } else if ("139".equals(area)) {
            partition = 3;
        }
        return partition;
    }
}

FlowReducer类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
/**
* FlowReducer
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outputValue = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long total_upflow = 0L;
        long total_downnflow = 0L;
        for (FlowBean flowBean : values) {
            total_upflow += flowBean.getUpFlow();
            total_downnflow += flowBean.getDownFlow();
        }
        outputValue.setUpFlow(total_upflow);
        outputValue.setDownFlow(total_downnflow);
        context.write(key, outputValue);
    }
}


运行main方法时 携带args[]参数指定输出和输入路径,在作者本机地址是输入路径G:\phone1和输出路径G:\phone2
G:\phone1\phone_data.txt的文件内容为
1363157985066         13726230503        00-FD-07-A4-72-B8:CMCC        120.196.100.82        i02.c.aliimg.com                24        27        2481        24681        200
1363157995052         13826544101        5C-0E-8B-C7-F1-E0:CMCC        120.197.40.4                        4        0        264        0        200
1363157991076         13926435656        20-10-7A-28-CC-0A:CMCC        120.196.100.99                        2        4        132        1512        200
1363154400022         13926251106        5C-0E-8B-8B-B1-50:CMCC        120.197.40.4                        4        0        240        0        200
1363157993044         18211575961        94-71-AC-CD-E6-18:CMCC-EASY        120.196.100.99        iface.qiyi.com        视频网站        15        12        1527        2106        200
1363157995074         84138413        5C-0E-8B-8C-E8-20:7DaysInn        120.197.40.4        122.72.52.12                20        16        4116        1432        200
1363157993055         13560439658        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        18        15        1116        954        200
1363157995033         15920133257        5C-0E-8B-C7-BA-20:CMCC        120.197.40.4        sug.so.360.cn        信息安全        20        20        3156        2936        200
1363157983019         13719199419        68-A1-B7-03-07-B1:CMCC-EASY        120.196.100.82                        4        0        240        0        200
1363157984041         13660577991        5C-0E-8B-92-5C-20:CMCC-EASY        120.197.40.4        s19.cnzz.com        站点统计        24        9        6960        690        200
1363157973098         15013685858        5C-0E-8B-C7-F7-90:CMCC        120.197.40.4        rank.ie.sogou.com        搜索引擎        28        27        3659        3538        200
1363157986029         15989002119        E8-99-C4-4E-93-E0:CMCC-EASY        120.196.100.99        www.umeng.com        站点统计        3        3        1938        180        200
1363157992093         13560439658        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        15        9        918        4938        200
1363157986041         13480253104        5C-0E-8B-C7-FC-80:CMCC-EASY        120.197.40.4                        3        3        180        180        200
1363157984040         13602846565        5C-0E-8B-8B-B6-00:CMCC        120.197.40.4        2052.flash2-http.qq.com        综合门户        15        12        1938        2910        200
1363157995093         13922314466        00-FD-07-A2-EC-BA:CMCC        120.196.100.82        img.qfc.cn                12        12        3008        3720        200
1363157982040         13502468823        5C-0A-5B-6A-0B-D4:CMCC-EASY        120.196.100.99        y0.ifengimg.com        综合门户        57        102        7335        110349        200
1363157986072         18320173382        84-25-DB-4F-10-1A:CMCC-EASY        120.196.100.99        input.shouji.sogou.com        搜索引擎        21        18        9531        2412        200
1363157990043         13925057413        00-1F-64-E1-E6-9A:CMCC        120.196.100.55        t3.baidu.com        搜索引擎        69        63        11058        48243        200
1363157988072         13760778710        00-FD-07-A4-7B-08:CMCC        120.196.100.82                        2        2        120        120        200
1363157985066         13726238888        00-FD-07-A4-72-B8:CMCC        120.196.100.82        i02.c.aliimg.com                24        27        2481        24681        200
1363157993055         13560436666        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        18        15        1116        954        200


输出文件G:\phone2\part-r-00000 内容为

13480253104        180        180        360
13502468823        7335        110349        117684
13560436666        1116        954        2070
13560439658        2034        5892        7926
13602846565        1938        2910        4848
13660577991        6960        690        7650
13719199419        240        0        240
13726230503        2481        24681        27162
13726238888        2481        24681        27162
13760778710        120        120        240
13826544101        264        0        264
13922314466        3008        3720        6728
13925057413        11058        48243        59301
13926251106        240        0        240
13926435656        132        1512        1644
15013685858        3659        3538        7197
15920133257        3156        2936        6092
15989002119        1938        180        2118
18211575961        1527        2106        3633
18320173382        9531        2412        11943
84138413        4116        1432        5548













欢迎光临 黑马程序员技术交流社区 (http://bbs.itheima.com/) 黑马程序员IT技术论坛 X3.2