A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

【郑州校区】大数据离线阶段Day4之Mapreduce的分区—Partitioner

1. 需求
将流量汇总统计结果按照手机归属地不同省份输出到不同文件中。
2. 分析
Mapreduce中会将map输出的kv对,按照相同key分组,然后分发给不同的reducetask。
默认的分发规则为:根据key的hashcode%reducetask数来分发
所以:如果要按照我们自己的需求进行分组,则需要改写数据分发(分组)组件Partitioner,自定义一个CustomPartitioner继承抽象类:Partitioner,然后在job对象中,设置自定义partitioner: job.setPartitionerClass(CustomPartitioner.class
3. 实现
[AppleScript] 纯文本查看 复制代码
public class ProvincePartitioner extends Partitioner<Text, FlowBean>

    public static HashMap<String, Integer>  provinceMap = new HashMap<String, Integer>();

    

    static{

        provinceMap.put("134", 0);

        provinceMap.put("135", 1);

        provinceMap.put("136", 2);

        provinceMap.put("137", 3);

        provinceMap.put("138", 4);

    }

    

    @Override

    public int getPartition(Text key, FlowBean value, int numPartitions) {

        Integer code = provinceMap.get(key.toString().substring(0, 3));

        

        if (code != null) {

            return code;

        }

        return 5;

    }

 

}
[AppleScript] 纯文本查看 复制代码
public class FlowSumProvince {

    

 public static class FlowSumProvinceMapper extends Mapper<LongWritable, Text, Text, FlowBean>{

        

     Text k = new Text();

     FlowBean  v = new FlowBean();

     

     @Override

     protected void map(LongWritable key, Text value,Context context)

             throws IOException, InterruptedException {

             //拿取一行文本转为String

             String line = value.toString();

             //按照分隔符\t进行分割

             String[] fileds = line.split("\t");

             //获取用户手机号

             String phoneNum = fileds[1];

             

             long upFlow = Long.parseLong(fileds[fileds.length-3]);

             long downFlow = Long.parseLong(fileds[fileds.length-2]);

             

             k.set(phoneNum);

             v.set(upFlow, downFlow);

             

             context.write(k,v);

                

        }

        

    }

    

 

 

 

    public static class FlowSumProvinceReducer extends Reducer<Text, FlowBean, Text, FlowBean>{

        

        FlowBean  v  = new FlowBean();

        

        @Override

        protected void reduce(Text key, Iterable<FlowBean> flowBeans,Context context) throws IOException, InterruptedException {

            

            long upFlowCount = 0;

            long downFlowCount = 0;

            

            for (FlowBean flowBean : flowBeans) {

                

                upFlowCount += flowBean.getUpFlow();

                

                downFlowCount += flowBean.getDownFlow();

                

            }

            v.set(upFlowCount, downFlowCount);

            

            context.write(key, v);

    }

    

    

    public static void main(String[] args) throws Exception{

        

 

        Configuration conf = new Configuration();

        conf.set("mapreduce.framework.name", "local");

 

        Job job = Job.getInstance(conf);

 

        //指定我这个 job 所在的 jar包位置

        job.setJarByClass(FlowSumProvince.class);

        

        //指定我们使用的Mapper是那个类  reducer是哪个类

        job.setMapperClass(FlowSumProvinceMapper.class);

        job.setReducerClass(FlowSumProvinceReducer.class);

//        job.setCombinerClass(FlowSumProvinceReducer.class);

        

        // 设置我们的业务逻辑 Mapper 类的输出 key 和 value 的数据类型

        job.setMapOutputKeyClass(Text.class);

        job.setMapOutputValueClass(FlowBean.class);

        

        // 设置我们的业务逻辑 Reducer 类的输出 key 和 value 的数据类型

        job.setOutputKeyClass(Text.class);

        job.setOutputValueClass(FlowBean.class);

        

        

        //这里设置运行reduceTask的个数

        //getPartition 返回的分区个数 = NumReduceTasks   正常执行

        //getPartition 返回的分区个数 > NumReduceTasks   报错:Illegal partition

        //getPartition 返回的分区个数 < NumReduceTasks   可以执行 ,多出空白文件

        job.setNumReduceTasks(10);

        

        

        //这里指定使用我们自定义的分区组件

        job.setPartitionerClass(ProvincePartitioner.class);

        

        

        FileInputFormat.setInputPaths(job, new Path("D:\\flowsum\\input"));

        // 指定处理完成之后的结果所保存的位置

        FileOutputFormat.setOutputPath(job, new Path("D:\\flowsum\\outputProvince"));

        

        boolean res = job.waitForCompletion(true);

        System.exit(res ? 0 : 1);

        

    }

 

 }

}
传智播客·黑马程序员郑州校区地址
河南省郑州市 高新区长椿路11号大学科技园(西区)东门8号楼三层
联系电话 0371-56061160/61/62
来校路线  地铁一号线梧桐街站A口出

0 个回复

您需要登录后才可以回帖 登录 | 加入黑马