A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

为什么要MapReduce
1)海量数据在单机上处理因为硬件资源限制,无法胜任
2)而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
3)引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理。
4)mapreduce分布式方案考虑的问题
(1)运算逻辑要不要先分后合?
(2)程序如何分配运算任务(切片)?
(3)两阶段的程序如何启动?如何协调?
(4)整个程序运行过程中的监控?容错?重试?
分布式方案需要考虑很多问题,但是我们可以将分布式程序中的公共功能封装成框架,让开发人员将精力集中于业务逻辑上。而mapreduce就是这样一个分布式程序的通用框架。
MapReduce进程
一个完整的mapreduce程序在分布式运行时有三类实例进程:
1)MrAppMaster:负责整个程序的过程调度及状态协调
2)MapTask:负责map阶段的整个数据处理流程

3)ReduceTask:负责reduce阶段的整个数据处理流程

MapReduce程序运行流程分析

1)在MapReduce程序读取文件的输入目录上存放相应的文件。
2)客户端程序在submit()方法执行前,获取待处理的数据信息,然后根据集群中参数的配置形成一个任务分配规划。
3)客户端提交job.split、jar包、job.xml等文件给yarn,yarn中的resourcemanager启动MRAppMaster。
4)MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程。
5)maptask利用客户指定的inputformat来读取数据,形成输入KV对。
6)maptask将输入KV对传递给客户定义的map()方法,做逻辑运算
7)map()运算完毕后将KV对收集到maptask缓存。
8)maptask缓存中的KV对按照K分区排序后不断写到磁盘文件
9)MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据分区。
10)Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算。
11)Reducetask运算完毕后,调用客户指定的outputformat将结果数据输出到外部存储。


文字描述完毕,接着就上代码把。本人参照尚硅谷的文档做了一个简单案例。

FlowBean类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* FlowBean
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowBean implements WritableComparable<FlowBean> {
    private long upFlow;
    private long downFlow;
    private long totalFlow;

    @Override
    public int compareTo(FlowBean o) {
        return this.totalFlow > o.totalFlow ? -1 : 1;
    }

    public FlowBean() {
    }

    public FlowBean(long upFlow, long downFlow) {
        this.upFlow = upFlow;
        this.downFlow = downFlow;
        this.totalFlow = upFlow + downFlow;
    }

    public long getUpFlow() {
        return upFlow;
    }

    public void setUpFlow(long upFlow) {
        this.upFlow = upFlow;
        this.totalFlow = this.upFlow + this.downFlow;
    }



    public long getDownFlow() {
        return downFlow;
    }

    public void setDownFlow(long downFlow) {
        this.downFlow = downFlow;
        this.totalFlow = this.upFlow + this.downFlow;
    }

    @Override
    public String toString() {
        return upFlow + "\t" + downFlow + "\t" + totalFlow;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(upFlow);
        out.writeLong(downFlow);
        out.writeLong(totalFlow);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.upFlow = in.readLong();
        this.downFlow = in.readLong();
        this.totalFlow = in.readLong();
    }
}
FlowDriver类:package com.pinyougou.Hadoop;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* FlowDriver
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowDriver {
    public static void main(String[] args) throws Exception {
     //   System.setProperty("hadoop.home.dir","E:/czbk/hadoop-2.7.4/bin" );

        // 获取配置信息
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);
        // 设置jar的路径
        job.setJarByClass(FlowDriver.class);
        // 关联Mapper和Reducer的类
        job.setMapperClass(FlowMapper.class);
        job.setReducerClass(FlowReducer.class);
        // 设置Mapper阶段的输出
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(FlowBean.class);
        // 设置Reducer阶段的输出
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(FlowBean.class);
        // 设置输入输出数据的路径
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // 设置分区
        // job.setPartitionerClass(FlowPartitioner.class);
        // job.setNumReduceTasks(5);

        // 提交
        boolean result = job.waitForCompletion(true);
        System.out.println(result ? "success!" : "fail!");
        System.exit(result ? 0 : 1);
    }
}

FlowMapper类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
/**
* FlowMapper
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
    private Text outputKey = new Text();
    private FlowBean flowBean = new FlowBean();

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String line = value.toString();
        String[] fields = line.split("\\s+");

        String phoneNum = fields[1];
        outputKey.set(phoneNum);

        long uploadFlow = Long.parseLong(fields[fields.length - 3]);
        long downFlow = Long.parseLong(fields[fields.length - 2]);
        flowBean.setUpFlow(uploadFlow);
        flowBean.setDownFlow(downFlow);

        context.write(outputKey, flowBean);
    }
}

FlowPartitioner类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* FlowPartitioner
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowPartitioner extends Partitioner<Text, FlowBean> {
    @Override
    public int getPartition(Text key, FlowBean flowBean, int reducerNum) {
        String telePhoneNum = key.toString();
        String area = telePhoneNum.substring(0, 3);
        int partition = 4;
        if ("136".equals(area)) {
            partition = 0;
        } else if ("137".equals(area)) {
            partition = 1;
        } else if ("138".equals(area)) {
            partition = 2;
        } else if ("139".equals(area)) {
            partition = 3;
        }
        return partition;
    }
}

FlowReducer类:package com.pinyougou.Hadoop;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
/**
* FlowReducer
*
* @author lee.
* @version 1.0
* <p>File Created at 2018-10-11<p>
*/
public class FlowReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    private FlowBean outputValue = new FlowBean();

    @Override
    protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
        long total_upflow = 0L;
        long total_downnflow = 0L;
        for (FlowBean flowBean : values) {
            total_upflow += flowBean.getUpFlow();
            total_downnflow += flowBean.getDownFlow();
        }
        outputValue.setUpFlow(total_upflow);
        outputValue.setDownFlow(total_downnflow);
        context.write(key, outputValue);
    }
}


运行main方法时 携带args[]参数指定输出和输入路径,在作者本机地址是输入路径G:\phone1和输出路径G:\phone2
G:\phone1\phone_data.txt的文件内容为
1363157985066         13726230503        00-FD-07-A4-72-B8:CMCC        120.196.100.82        i02.c.aliimg.com                24        27        2481        24681        200
1363157995052         13826544101        5C-0E-8B-C7-F1-E0:CMCC        120.197.40.4                        4        0        264        0        200
1363157991076         13926435656        20-10-7A-28-CC-0A:CMCC        120.196.100.99                        2        4        132        1512        200
1363154400022         13926251106        5C-0E-8B-8B-B1-50:CMCC        120.197.40.4                        4        0        240        0        200
1363157993044         18211575961        94-71-AC-CD-E6-18:CMCC-EASY        120.196.100.99        iface.qiyi.com        视频网站        15        12        1527        2106        200
1363157995074         84138413        5C-0E-8B-8C-E8-20:7DaysInn        120.197.40.4        122.72.52.12                20        16        4116        1432        200
1363157993055         13560439658        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        18        15        1116        954        200
1363157995033         15920133257        5C-0E-8B-C7-BA-20:CMCC        120.197.40.4        sug.so.360.cn        信息安全        20        20        3156        2936        200
1363157983019         13719199419        68-A1-B7-03-07-B1:CMCC-EASY        120.196.100.82                        4        0        240        0        200
1363157984041         13660577991        5C-0E-8B-92-5C-20:CMCC-EASY        120.197.40.4        s19.cnzz.com        站点统计        24        9        6960        690        200
1363157973098         15013685858        5C-0E-8B-C7-F7-90:CMCC        120.197.40.4        rank.ie.sogou.com        搜索引擎        28        27        3659        3538        200
1363157986029         15989002119        E8-99-C4-4E-93-E0:CMCC-EASY        120.196.100.99        www.umeng.com        站点统计        3        3        1938        180        200
1363157992093         13560439658        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        15        9        918        4938        200
1363157986041         13480253104        5C-0E-8B-C7-FC-80:CMCC-EASY        120.197.40.4                        3        3        180        180        200
1363157984040         13602846565        5C-0E-8B-8B-B6-00:CMCC        120.197.40.4        2052.flash2-http.qq.com        综合门户        15        12        1938        2910        200
1363157995093         13922314466        00-FD-07-A2-EC-BA:CMCC        120.196.100.82        img.qfc.cn                12        12        3008        3720        200
1363157982040         13502468823        5C-0A-5B-6A-0B-D4:CMCC-EASY        120.196.100.99        y0.ifengimg.com        综合门户        57        102        7335        110349        200
1363157986072         18320173382        84-25-DB-4F-10-1A:CMCC-EASY        120.196.100.99        input.shouji.sogou.com        搜索引擎        21        18        9531        2412        200
1363157990043         13925057413        00-1F-64-E1-E6-9A:CMCC        120.196.100.55        t3.baidu.com        搜索引擎        69        63        11058        48243        200
1363157988072         13760778710        00-FD-07-A4-7B-08:CMCC        120.196.100.82                        2        2        120        120        200
1363157985066         13726238888        00-FD-07-A4-72-B8:CMCC        120.196.100.82        i02.c.aliimg.com                24        27        2481        24681        200
1363157993055         13560436666        C4-17-FE-BA-DE-D9:CMCC        120.196.100.99                        18        15        1116        954        200


输出文件G:\phone2\part-r-00000 内容为

13480253104        180        180        360
13502468823        7335        110349        117684
13560436666        1116        954        2070
13560439658        2034        5892        7926
13602846565        1938        2910        4848
13660577991        6960        690        7650
13719199419        240        0        240
13726230503        2481        24681        27162
13726238888        2481        24681        27162
13760778710        120        120        240
13826544101        264        0        264
13922314466        3008        3720        6728
13925057413        11058        48243        59301
13926251106        240        0        240
13926435656        132        1512        1644
15013685858        3659        3538        7197
15920133257        3156        2936        6092
15989002119        1938        180        2118
18211575961        1527        2106        3633
18320173382        9531        2412        11943
84138413        4116        1432        5548








0 个回复

您需要登录后才可以回帖 登录 | 加入黑马