A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

使用二次排序来实现customers(顾客信息)与orders(订单信息)相关联

使用的是本地测试,并未放到集群(不影响使用)

customers.txt

(cid,name)

1,zpx
2,zpx1
3,zpx2
4,zpx3
5,zpx4

6,zpx5

orders.txt

(id,price,cid)

1,12.5,1
2,12.6,1
3,12.7,2
4,12.8,2
5,12.9,4
6,12.4,3
7,12.3,4
8,12.2,2
9,12.1,1
10,12.0,2
11,22.0,3
12,23.0,4
13,24.5,1

14,26.0,5

1.组合Key


package com.hadoop.mr.join;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 用户订单组合Key
* @date 2018/7/510:06
*/
public class ComboKey implements WritableComparable<ComboKey>{
//    0-customer 1-order
    private int type;
    private int cid;
    private int oid;
    private String customerInfo="";
    private String orderInfo="";
    @Override
    public int compareTo(ComboKey o) {
        int cid0 = o.getCid();
        int type0 = o.getType();
        int oid0 = o.getOid();
        //是否是同一个customer的数据
        if(cid == cid0){
               //同一个customer的两个订单order
            if(type == type0){
                return oid - oid0;
            }else{
                //一个custmoer的一个订单order
                if(type ==0){
                    //customer在前
                    return  -1;
                }else{
                    return  1;
                }

            }
        }else{
            //cid不一样
            return cid - cid0;
        }
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(type);
        out.writeInt(cid);
        out.writeInt(oid);
        out.writeUTF(customerInfo);
        out.writeUTF(orderInfo);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        this.type = in.readInt();
        this.cid = in.readInt();
        this.oid = in.readInt();
        this.customerInfo = in.readUTF();
        this.orderInfo = in.readUTF();
    }

    public int getType() {
        return type;
    }

    public void setType(int type) {
        this.type = type;
    }

    public int getCid() {
        return cid;
    }

    public void setCid(int cid) {
        this.cid = cid;
    }

    public int getOid() {
        return oid;
    }

    public void setOid(int oid) {
        this.oid = oid;
    }

    public String getCustomerInfo() {
        return customerInfo;
    }

    public void setCustomerInfo(String customerInfo) {
        this.customerInfo = customerInfo;
    }

    public String getOrderInfo() {
        return orderInfo;
    }

    public void setOrderInfo(String orderInfo) {
        this.orderInfo = orderInfo;
    }

    @Override
    public String toString() {
        return "ComboKey{" +
                "type=" + type +
                ", cid=" + cid +
                ", oid=" + oid +
                ", customerInfo='" + customerInfo + '\'' +
                ", orderInfo='" + orderInfo + '\'' +
                '}';
    }
}
2.按照cid自定义分区对比器


package com.hadoop.mr.join;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 按照cid自定义分区
* @date 2018/7/510:24
*/
public class CIDPartitioner extends Partitioner<ComboKey,NullWritable>{
    @Override
    public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions){
        return key.getCid() % numPartitions;
    }
}
3.按照cid自定义分组


package com.hadoop.mr.join;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义分组对比器,按照cid进行分组
* @date 2018/7/510:33
*/
public class CIDGroupComparator extends WritableComparator{
    protected CIDGroupComparator() {
        super(ComboKey.class, true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        ComboKey k1 = (ComboKey) a;
        ComboKey k2 = (ComboKey) b;
        return k1.getCid() - k2.getCid();
    }
}
4.自定义排序对比器


package com.hadoop.mr.join;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义排序对比器
* @date 2018/7/510:35
*/
public class ComboKeyComparator extends WritableComparator{
    protected ComboKeyComparator() {
        super(ComboKey.class, true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        ComboKey k1 = (ComboKey) a;
        ComboKey k2 = (ComboKey) b;
        return k1.compareTo(k2);
    }
}
5.Mapper


package com.hadoop.mr.join;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义排序对比器
* @date 2018/7/510:35
*/
public class ComboKeyComparator extends WritableComparator{
    protected ComboKeyComparator() {
        super(ComboKey.class, true);
    }

    public int compare(WritableComparable a, WritableComparable b) {
        ComboKey k1 = (ComboKey) a;
        ComboKey k2 = (ComboKey) b;
        return k1.compareTo(k2);
    }
}
6.Reducer


package com.hadoop.mr.join;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: ${todo}
* @date 2018/7/510:40
*/
public class CustomerJoinOrderReducer extends Reducer<ComboKey,NullWritable,Text,NullWritable> {
    @Override
    protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        Iterator<NullWritable> it = values.iterator();
        it.next();
        int type = key.getType();
        int cid = key.getCid();
        String cinfo = key.getCustomerInfo();
        while (it.hasNext()) {
            it.next();
            String oinfo = key.getOrderInfo();
            context.write(new Text(cinfo + "," + oinfo), NullWritable.get());
        }
    }
}
7.主函数实现


package com.hadoop.mr.join;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: ${todo}
* @date 2018/7/510:44
*/
public class CustomerJoinOrderApp {
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        Job job=Job.getInstance(conf);
        //设置job的各种属性
        job.setJobName("CustomerJoinOrderApp");                        //作业名称
        job.setJarByClass(CustomerJoinOrderApp.class);                 //搜索类

        //添加输入路径
        FileInputFormat.addInputPath(job,new Path("F://hadoop-mr//join/"));
        //设置输出路径
        FileOutputFormat.setOutputPath(job,new Path("F://hadoop-mr//out"));

        job.setMapperClass(CustomerJoinOrderMapper.class);             //mapper类
        job.setReducerClass(CustomerJoinOrderReducer.class);           //reducer类

        //设置Map输出类型
        job.setMapOutputKeyClass(ComboKey.class);            //
        job.setMapOutputValueClass(NullWritable.class);      //

        //设置ReduceOutput类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);         //

        //设置分区类
        job.setPartitionerClass(CIDPartitioner.class);
        //设置分组对比器
        job.setGroupingComparatorClass(CIDGroupComparator.class);
        //设置排序对比器
        job.setSortComparatorClass(ComboKeyComparator.class);
        job.setNumReduceTasks(1);                           //reduce个数
        job.waitForCompletion(true);
    }
}



5 个回复

倒序浏览
奈斯,优秀
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马