模拟数据: [hadoop@h71 q1]$ vi aa.txt
aa 1 2
bb 2 22
cc 11
dd 1
ee 99 99999
ff 12 23123
注意:这里的分隔符是/t(Tab键)而不是空格
[hadoop@h71 q1]$ hadoop fs -put aa.txt /input
java代码:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class MyPartitioner {
public static class MyPartitionerMap extends Mapper<LongWritable, Text, Text, Text> {
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
throws java.io.IOException, InterruptedException {
String arr_value[] = value.toString().split("\t");
if (arr_value.length > 3) {
context.write(new Text("long"), value);
} else if (arr_value.length < 3) {
context.write(new Text("short"), value);
} else {
context.write(new Text("right"), value);
}
}
}
/**
* partitioner的输入就是map的输出
*/
public static class MyPartitionerPar extends Partitioner<Text, Text> {
@Override
public int getPartition(Text key, Text value, int numPartitions) {
int result = 0;
/*********************************************************************/
/***key.toString().equals("long") must use toString()!!!! ***/
/***开始的时候我没有用 ,导致都在一个区里,结果也在一个reduce输出文件中。 ***/
/********************************************************************/
if (key.toString().equals("long")) {
result = 0 % numPartitions;
} else if (key.toString().equals("short")) {
result = 1 % numPartitions;
} else if (key.toString().equals("right")) {
result = 2 % numPartitions;
}
return result;
}
}
public static class MyPartitionerReduce extends Reducer<Text, Text, Text, Text> {
protected void reduce(Text key, java.lang.Iterable<Text> value, Context context) throws java.io.IOException,
InterruptedException {
for (Text val : value) {
context.write(key, val);
//context.write(key, val);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length != 2) {
System.err.println("Usage: MyPartitioner <in> <out>");
System.exit(2);
}
conf.set("mapred.jar","mp1.jar");
Job job = new Job(conf, "MyPartitioner");
job.setNumReduceTasks(3);
job.setJarByClass(MyPartitioner.class);
job.setMapperClass(MyPartitionerMap.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setPartitionerClass(MyPartitionerPar.class);
job.setReducerClass(MyPartitionerReduce.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MyPartitioner.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MyPartitioner*class
[hadoop@h71 q1]$ hadoop jar xx.jar MyPartitioner /input/aa.txt /output
查看数据: [hadoop@h71 q1]$ hadoop fs -lsr /output
rw-r--r-- 2 hadoop supergroup 0 2017-03-18 22:55 /output/_SUCCESS
-rw-r--r-- 2 hadoop supergroup 36 2017-03-18 22:55 /output/part-r-00000
-rw-r--r-- 2 hadoop supergroup 23 2017-03-18 22:55 /output/part-r-00001
-rw-r--r-- 2 hadoop supergroup 27 2017-03-18 22:55 /output/part-r-00002
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
long ff 12 23 123
long ee 99 99 999
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00001
short dd 1
short cc 11
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00002
right bb 2 22
right aa 1 2
【转】 https://blog.csdn.net/m0_37739193/article/details/76566636
|