A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

需求:统计输出日志里面所有的人(手机号)访问每个网站所耗上下行流量及总和


* 本案例实现的功能:演示自定义数据类型如何实现hadoop的序列化接口
1、该类一定要保留空参构造函数
2、write方法中输出字段二进制数据的顺序  要与  readFields方法读取数据的顺序一致

梳理流程:

1、创建一个访问的流量bean

2、先写一个mapper

3、再写一个Reduce

4、还要一个jobsubmitter来运行测试



流量bean


public class FlowBean implements Writable {

        private int upFlow;
        private int dFlow;
        private String phone;
        private int amountFlow;

        public FlowBean(){}
       
        public FlowBean(String phone, int upFlow, int dFlow) {
                this.phone = phone;
                this.upFlow = upFlow;
                this.dFlow = dFlow;
                this.amountFlow = upFlow + dFlow;
        }

        public String getPhone() {
                return phone;
        }

        public void setPhone(String phone) {
                this.phone = phone;
        }

        public int getUpFlow() {
                return upFlow;
        }

        public void setUpFlow(int upFlow) {
                this.upFlow = upFlow;
        }

        public int getdFlow() {
                return dFlow;
        }

        public void setdFlow(int dFlow) {
                this.dFlow = dFlow;
        }

        public int getAmountFlow() {
                return amountFlow;
        }

        public void setAmountFlow(int amountFlow) {
                this.amountFlow = amountFlow;
        }

        /**
         * hadoop系统在序列化该类的对象时要调用的方法
         */
        @Override
        public void write(DataOutput out) throws IOException {

                out.writeInt(upFlow);
                out.writeUTF(phone);
                out.writeInt(dFlow);
                out.writeInt(amountFlow);

        }

        /**
         * hadoop系统在反序列化该类的对象时要调用的方法
         */
        @Override
        public void readFields(DataInput in) throws IOException {
                this.upFlow = in.readInt();
                this.phone = in.readUTF();
                this.dFlow = in.readInt();
                this.amountFlow = in.readInt();
        }

        @Override
        public String toString() {
                 
                return this.phone + ","+this.upFlow +","+ this.dFlow +"," + this.amountFlow;
        }
       
}
mapper:

public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
       
       
        @Override
        protected void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {//这个逻辑部分自己根据需求来写,这里只是示例
               
                String line = value.toString();
                String[] fields = line.split("\t");
               
                String phone = fields[1];
               
                int upFlow = Integer.parseInt(fields[fields.length-4]);
                int dFlow = Integer.parseInt(fields[fields.length-3]);
               
                context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow));
        }
}
reducer :

public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{
        /**
         *  key:是某个手机号
         *  values:是这个手机号所产生的所有访问记录中的流量数据
         *  
         *  <135,flowBean1><135,flowBean2><135,flowBean3><135,flowBean4>
         */
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
                        throws IOException, InterruptedException {
                int upSum = 0;
                int dSum = 0;
               
                for(FlowBean value:values){
                        upSum += value.getUpFlow();
                        dSum += value.getdFlow();
                }
                context.write(key, new FlowBean(key.toString(), upSum, dSum));
        }
}
jobsubmitter:

public static void main(String[] args) throws Exception {

                Configuration conf = new Configuration();

                Job job = Job.getInstance(conf);

                job.setJarByClass(JobSubmitter.class);

                job.setMapperClass(FlowCountMapper.class);
                job.setReducerClass(FlowCountReducer.class);
               
                // 设置参数:maptask在做数据分区时,用哪个分区逻辑类  (如果不指定,它会用默认的HashPartitioner)
                job.setPartitionerClass(ProvincePartitioner.class);
                // 由于我们的ProvincePartitioner可能会产生8种分区号,所以,需要有6个reduce task来接收
                job.setNumReduceTasks(8);

                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(FlowBean.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(FlowBean.class);

                FileInputFormat.setInputPaths(job, new Path("F:\\mrdata\\flow\\input"));
                FileOutputFormat.setOutputPath(job, new Path("F:\\mrdata\\flow\\province-output"));

                job.waitForCompletion(true);

        }

---------------------
【转载,仅作分享,侵删】
作者:高辉
原文:https://blog.csdn.net/ZJX103RLF/article/details/89006173
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享!
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马