A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

模拟数据:

[hadoop@h71 q1]$ vi aa.txt
aa 1 2
bb 2 22
cc 11
dd 1
ee 99 99999
ff 12 23123

注意:这里的分隔符是/t(Tab键)而不是空格
[hadoop@h71 q1]$ hadoop fs -put aa.txt /input


java代码:




  • import org.apache.hadoop.conf.Configuration;  



  • import org.apache.hadoop.fs.Path;  



  • import org.apache.hadoop.io.LongWritable;  



  • import org.apache.hadoop.io.NullWritable;  



  • import org.apache.hadoop.io.Text;  



  • import org.apache.hadoop.mapreduce.Job;  



  • import org.apache.hadoop.mapreduce.Mapper;  



  • import org.apache.hadoop.mapreduce.Partitioner;  



  • import org.apache.hadoop.mapreduce.Reducer;  



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  



  • import org.apache.hadoop.util.GenericOptionsParser;  







  • public class MyPartitioner {  







  •     public static class MyPartitionerMap extends Mapper<LongWritable, Text, Text, Text> {  



  •         protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)  



  •         throws java.io.IOException, InterruptedException {  



  •             String arr_value[] = value.toString().split("\t");  



  •             if (arr_value.length > 3) {  



  •                 context.write(new Text("long"), value);  



  •             } else if (arr_value.length < 3) {  



  •                 context.write(new Text("short"), value);  



  •             } else {  



  •                 context.write(new Text("right"), value);  



  •             }  



  •         }  



  •     }  







  •     /**



  •     * partitioner的输入就是map的输出



  •     */  



  •     public static class MyPartitionerPar extends Partitioner<Text, Text> {  



  •         @Override  



  •         public int getPartition(Text key, Text value, int numPartitions) {  



  •             int result = 0;  



  •             /*********************************************************************/  



  •             /***key.toString().equals("long")  must use toString()!!!!  ***/  



  •             /***开始的时候我没有用 ,导致都在一个区里,结果也在一个reduce输出文件中。  ***/  



  •             /********************************************************************/  



  •             if (key.toString().equals("long")) {  



  •                 result = 0 % numPartitions;  



  •             } else if (key.toString().equals("short")) {  



  •                 result = 1 % numPartitions;  



  •             } else if (key.toString().equals("right")) {  



  •                 result = 2 % numPartitions;  



  •             }  



  •             return result;  



  •         }  



  •     }  







  •     public static class MyPartitionerReduce extends Reducer<Text, Text, Text, Text> {  



  •         protected void reduce(Text key, java.lang.Iterable<Text> value, Context context) throws java.io.IOException,  



  •         InterruptedException {  



  •             for (Text val : value) {  



  •                 context.write(key, val);  



  •                 //context.write(key, val);  



  •             }  



  •         }  



  •     }  







  •     public static void main(String[] args) throws Exception {  



  •         Configuration conf = new Configuration();  



  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();  



  •         if (otherArgs.length != 2) {  



  •             System.err.println("Usage: MyPartitioner <in> <out>");  



  •             System.exit(2);  



  •         }



  •         conf.set("mapred.jar","mp1.jar");



  •         Job job = new Job(conf, "MyPartitioner");  



  •         job.setNumReduceTasks(3);  







  •         job.setJarByClass(MyPartitioner.class);  







  •         job.setMapperClass(MyPartitionerMap.class);  







  •         job.setMapOutputKeyClass(Text.class);  



  •         job.setMapOutputValueClass(Text.class);  







  •         job.setPartitionerClass(MyPartitionerPar.class);  



  •         job.setReducerClass(MyPartitionerReduce.class);  







  •         job.setOutputKeyClass(NullWritable.class);  



  •         job.setOutputValueClass(Text.class);  







  •         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));  



  •         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));  



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);  



  •     }  



  • }  



执行:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MyPartitioner.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MyPartitioner*class
[hadoop@h71 q1]$ hadoop jar xx.jar MyPartitioner /input/aa.txt /output


查看数据:

[hadoop@h71 q1]$ hadoop fs -lsr /output
rw-r--r--   2 hadoop supergroup          0 2017-03-18 22:55 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         36 2017-03-18 22:55 /output/part-r-00000
-rw-r--r--   2 hadoop supergroup         23 2017-03-18 22:55 /output/part-r-00001
-rw-r--r--   2 hadoop supergroup         27 2017-03-18 22:55 /output/part-r-00002


[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
long    ff      12      23      123
long    ee      99      99      999
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00001
short   dd      1
short   cc      11
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00002
right   bb      2       22
right   aa      1       2



【转】 https://blog.csdn.net/m0_37739193/article/details/76566636


3 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
奈斯,加油加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马