以下的几种实现,采用的是本地实现,并没有放在分布式上去运行,如果方分布式,打成Jar包即可
1.全排序
全排序的意思是在生成的所有结果文件中,Key是从小到大排序的,注意,不是局部有序(某一个输出文件Key有序)而是所有的输出文件,也就是说从第一个输出文件开始到最后一个输出文件,Key是从小到大排序的。
实现方法:
1.定义1个reduce
2.自定义分区函数.
自行设置分解区间。
3.使用hadoop采样机制。
通过采样器生成分区文件,结合hadoop的TotalOrderPartitioner进行分区划分。
我们来实现一个实例,随机生成6000条天气预报数据,每条数据为年份与年份对应的温度,利用全排序将每年的最高气温输出
其中方法1,2不具有普遍性,方法3的实现如下:
数据准备:
import org.junit.Test;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Random;
/**
* @author zpx
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 随机生成天气数据
* @date 2018/7/316:24
*/
public class PrepareTempData{
@Test
public void makeData() throws IOException {
FileWriter fw=new FileWriter("f:/hadoop-mr/temp.txt");
for(int i=0;i<6000;i++){
int year=1970+new Random().nextInt(100);
int temp=-30+new Random().nextInt(100);
fw.write("" + year + " " + temp + "\r\n");
}
fw.close();
}
}
Map实现:
package com.hadoop.mr.allosort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author zpx
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 全排序最大气温的Mapper
* @date 2018/6/2811:13
*/
public class MaxTempMapper extends Mapper<IntWritable, IntWritable, IntWritable, IntWritable> {
@Override
protected void map(IntWritable key, IntWritable value, Context context) throws IOException, InterruptedException {
context.write(key,value);
}
}
Reduce实现:
package com.hadoop.mr.allosort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author zpx
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 全排序最大气温的Reducer
* @date 2018/7/316:37
*/
public class MaxTempReducer extends Reducer<IntWritable,IntWritable,IntWritable,IntWritable>{
@Override
protected void reduce(IntWritable key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int max=Integer.MIN_VALUE;
for(IntWritable v:values){
max=max>v.get()?max:v.get();
}
context.write(key,new IntWritable(max));
}
}
主函数
package com.hadoop.mr.allosort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
/**
*
*/
public class MaxTempApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
//设置job的各种属性
job.setJobName("MaxTempApp"); //作业名称
job.setJarByClass(MaxTempApp.class); //搜索类
job.setInputFormatClass(SequenceFileInputFormat.class); //设置输入格式
//添加输入路径
FileInputFormat.addInputPath(job,new Path("F://hadoop-mr//seq//temp"));
//设置输出路径
FileOutputFormat.setOutputPath(job,new Path("F://hadoop-mr//out"));
job.setMapperClass(MaxTempMapper.class); //mapper类
job.setReducerClass(MaxTempReducer.class); //reducer类
job.setMapOutputKeyClass(IntWritable.class); //
job.setMapOutputValueClass(IntWritable.class); //
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class); //
//创建随机采样器对象
//freq:每个key被选中的概率
//numSapmple:抽取样本的总数
//maxSplitSampled:最大采样切片数
InputSampler.Sampler<IntWritable, IntWritable> sampler =
new InputSampler.RandomSampler<IntWritable, IntWritable>(0.5, 3000, 3);
job.setNumReduceTasks(3); //reduce个数
//将sample数据写入分区文件.
TotalOrderPartitioner.setPartitionFile(job.getConfiguration(),new Path("f:/hadoop-mr/seq/par.lst"));
//设置全排序分区类
job.setPartitionerClass(TotalOrderPartitioner.class);
InputSampler.writePartitionFile(job, sampler);
job.waitForCompletion(true);
}
}
2.倒排序
KV对调。
3.二次排序
Key排序(默认Map端排序),但有时候也需要对Value来进行排序。我们来用二次排序实现一个天气预报数据输出的功能,我们自己生成6000条数据,随机产生年份与年份对应的温度,然后输出每年的最高气温
(1)自定义Key
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
/**
* 自定义组合key
*/
public class ComboKey implements WritableComparable<ComboKey> {
private int year ;
private int temp ;
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
/**
* 对key进行比较实现
*/
public int compareTo(ComboKey o) {
int y0 = o.getYear();
int t0 = o.getTemp() ;
//年份相同(升序)
if(year == y0){
//气温降序
return -(temp - t0) ;
}
else{
return year - y0 ;
}
}
/**
* 序列化过程
*/
public void write(DataOutput out) throws IOException {
//年份
out.writeInt(year);
//气温
out.writeInt(temp);
}
public void readFields(DataInput in) throws IOException {
year = in.readInt();
temp = in.readInt();
}
public String toString() {
return year + ":" + temp ;
}
}
(2.自定义分区类,按照年份分区)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义分区:按照年份来进行分区
* @date 2018/7/317:10
*/
public class YearPartitioner extends Partitioner<ComboKey,NullWritable>{
@Override
public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions){
int year = key.getYear();
return year % numPartitions;
}
}
(3.自定义分组对比器)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 自定义分组:按照年份来进行分组
* @date 2018/7/317:13
*/
public class YearGroupComparator extends WritableComparator{
protected YearGroupComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey)a ;
ComboKey k2 = (ComboKey)b ;
return k1.getYear() - k2.getYear() ;
}
}
(4.定义Key排序对比器)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/**
* @Description:定义Key排序对比器将相同年份的温度最高值放在第一位
* @param ${tags}
* @return ${return_type}
* @throws
* @author 邹培贤
* @date 2018/7/3 17:17
*/
public class ComboKeyComparator extends WritableComparator {
protected ComboKeyComparator() {
super(ComboKey.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
ComboKey k1 = (ComboKey) a;
ComboKey k2 = (ComboKey) b;
return k1.compareTo(k2);
}
}
(5.Mapper)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 二次排序的最大气温的Mapper
* @date 2018/7/317:06
*/
public class MaxTempMapper extends Mapper<LongWritable,Text,ComboKey,NullWritable>{
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] arr = value.toString().split(" ");
ComboKey keyOut=new ComboKey();
keyOut.setYear(Integer.parseInt(arr[0]));
keyOut.setTemp(Integer.parseInt(arr[1]));
context.write(keyOut,NullWritable.get());
}
}
(6.Reducer)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* @author 邹培贤
* @Title: ${file_name}
* @Package ${package_name}
* @Description: 二次排序的最大气温的Reducer
* @date 2018/7/317:18
*/
public class MaxTempReducer extends Reducer<ComboKey,NullWritable,IntWritable,IntWritable>{
@Override
protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new IntWritable(key.getYear()),new IntWritable(key.getTemp()));
}
}
(7.主函数)
package com.hadoop.mr.secondarysort;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
*
*/
public class MaxTempApp {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","file:///");
Job job = Job.getInstance(conf);
//设置job的各种属性
job.setJobName("SecondarySortApp"); //作业名称
job.setJarByClass(MaxTempApp.class); //搜索类
job.setInputFormatClass(TextInputFormat.class); //设置输入格式
//添加输入路径
FileInputFormat.addInputPath(job,new Path("F://hadoop-mr//temp.txt"));
//设置输出路径
FileOutputFormat.setOutputPath(job,new Path("F://hadoop-mr//out"));
job.setMapperClass(MaxTempMapper.class); //mapper类
job.setReducerClass(MaxTempReducer.class); //reducer类
//设置Map输出类型
job.setMapOutputKeyClass(ComboKey.class); //
job.setMapOutputValueClass(NullWritable.class); //
//设置ReduceOutput类型
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class); //
//设置分区类
job.setPartitionerClass(YearPartitioner.class);
//设置分组对比器
job.setGroupingComparatorClass(YearGroupComparator.class);
//设置排序对比器
job.setSortComparatorClass(ComboKeyComparator.class);
job.setNumReduceTasks(3); //reduce个数
//
job.waitForCompletion(true);
}
}
|
|