使用二次排序来实现customers(顾客信息)与orders(订单信息)相关联
使用的是本地测试,并未放到集群(不影响使用)
customers.txt
(cid,name)
1,zpx
2,zpx1
3,zpx2
4,zpx3
5,zpx4
6,zpx5
orders.txt
(id,price,cid)
1,12.5,1
2,12.6,1
3,12.7,2
4,12.8,2
5,12.9,4
6,12.4,3
7,12.3,4
8,12.2,2
9,12.1,1
10,12.0,2
11,22.0,3
12,23.0,4
13,24.5,1
14,26.0,5
1.组合Key
package com.hadoop.mr.join;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 用户订单组合Key
* @date 2018/7/510:06
*/
public class ComboKey implements WritableComparable<ComboKey>{
// 0-customer 1-order
private int type;
private int cid;
private int oid;
private String customerInfo="";
private String orderInfo="";
@Override
public int compareTo(ComboKey o) {
int cid0 = o.getCid();
int type0 = o.getType();
int oid0 = o.getOid();
//是否是同一个customer的数据
if(cid == cid0){
//同一个customer的两个订单order
if(type == type0){
return oid - oid0;
}else{
//一个custmoer的一个订单order
if(type ==0){
//customer在前
return -1;
}else{
return 1;
}
}
}else{
//cid不一样
return cid - cid0;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(type);
out.writeInt(cid);
out.writeInt(oid);
out.writeUTF(customerInfo);
out.writeUTF(orderInfo);
}
@Override
public void readFields(DataInput in) throws IOException {
this.type = in.readInt();
this.cid = in.readInt();
this.oid = in.readInt();
this.customerInfo = in.readUTF();
this.orderInfo = in.readUTF();
}
public int getType() {
return type;
}
public void setType(int type) {
this.type = type;
}
public int getCid() {
return cid;
}
public void setCid(int cid) {
this.cid = cid;
}
public int getOid() {
return oid;
}
public void setOid(int oid) {
this.oid = oid;
}
public String getCustomerInfo() {
return customerInfo;
}
public void setCustomerInfo(String customerInfo) {
this.customerInfo = customerInfo;
}
public String getOrderInfo() {
return orderInfo;
}
public void setOrderInfo(String orderInfo) {
this.orderInfo = orderInfo;
}
@Override
public String toString() {
return "ComboKey{" +
"type=" + type +
", cid=" + cid +
", oid=" + oid +
", customerInfo='" + customerInfo + '\'' +
", orderInfo='" + orderInfo + '\'' +
'}';
}
}
2.按照cid自定义分区对比器
package com.hadoop.mr.join;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 按照cid自定义分区
* @date 2018/7/510:24
*/
public class CIDPartitioner extends Partitioner<ComboKey,NullWritable>{
@Override
public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions){
return key.getCid() % numPartitions;
}
}
3.按照cid自定义分组
package com.hadoop.mr.join;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义分组对比器,按照cid进行分组
* @date 2018/7/510:33
*/
public class CIDGroupComparator extends WritableComparator{
protected CIDGroupComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.getCid() - k2.getCid();
}
}
4.自定义排序对比器
package com.hadoop.mr.join;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义排序对比器
* @date 2018/7/510:35
*/
public class ComboKeyComparator extends WritableComparator{
protected ComboKeyComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.compareTo(k2);
}
}
5.Mapper
package com.hadoop.mr.join;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义排序对比器
* @date 2018/7/510:35
*/
public class ComboKeyComparator extends WritableComparator{
protected ComboKeyComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.compareTo(k2);
}
}
6.Reducer
package com.hadoop.mr.join;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.util.Iterator;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: ${todo}
* @date 2018/7/510:40
*/
public class CustomerJoinOrderReducer extends Reducer<ComboKey,NullWritable,Text,NullWritable> {
@Override
protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
Iterator<NullWritable> it = values.iterator();
it.next();
int type = key.getType();
int cid = key.getCid();
String cinfo = key.getCustomerInfo();
while (it.hasNext()) {
it.next();
String oinfo = key.getOrderInfo();
context.write(new Text(cinfo + "," + oinfo), NullWritable.get());
}
}
}
7.主函数实现
package com.hadoop.mr.join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: ${todo}
* @date 2018/7/510:44
*/
public class CustomerJoinOrderApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
Job job=Job.getInstance(conf);
//设置job的各种属性
job.setJobName("CustomerJoinOrderApp"); //作业名称
job.setJarByClass(CustomerJoinOrderApp.class); //搜索类
//添加输入路径
FileInputFormat.addInputPath(job,new Path("F://hadoop-mr//join/"));
//设置输出路径
FileOutputFormat.setOutputPath(job,new Path("F://hadoop-mr//out"));
job.setMapperClass(CustomerJoinOrderMapper.class); //mapper类
job.setReducerClass(CustomerJoinOrderReducer.class); //reducer类
//设置Map输出类型
job.setMapOutputKeyClass(ComboKey.class); //
job.setMapOutputValueClass(NullWritable.class); //
//设置ReduceOutput类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class); //
//设置分区类
job.setPartitionerClass(CIDPartitioner.class);
//设置分组对比器
job.setGroupingComparatorClass(CIDGroupComparator.class);
//设置排序对比器
job.setSortComparatorClass(ComboKeyComparator.class);
job.setNumReduceTasks(1); //reduce个数
job.waitForCompletion(true);
}
}
|
|