A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

© 不二晨 金牌黑马   /  2018-8-22 09:55  /  1189 人查看  /  3 人回复  /   0 人收藏 转载请遵从CC协议 禁止商业使用本文

情况1:
[hadoop@h71 q1]$ vi ip.txt
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.1.1 aaa
192.168.2.2 ccc
192.168.3.3 ddd
192.168.3.3 ddd
192.168.3.3 ddd
192.168.5.5 fff
[hadoop@h71 q1]$ hadoop fs -put ip.txt /input

java代码(将IP统计并升序输出):




  • import java.io.IOException;



  • import java.util.Iterator;







  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapred.FileInputFormat;



  • import org.apache.hadoop.mapred.FileOutputFormat;



  • import org.apache.hadoop.mapred.JobClient;



  • import org.apache.hadoop.mapred.JobConf;



  • import org.apache.hadoop.mapred.MapReduceBase;



  • import org.apache.hadoop.mapred.Mapper;



  • import org.apache.hadoop.mapred.OutputCollector;



  • import org.apache.hadoop.mapred.Reducer;



  • import org.apache.hadoop.mapred.Reporter;



  • import org.apache.hadoop.mapred.TextInputFormat;



  • import org.apache.hadoop.mapred.TextOutputFormat;







  • public class IpTopK {







  •     public static class IpTopKMapper1 extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> {



  •         @Override



  •         public void map(LongWritable longWritable, Text text, OutputCollector<Text, Text>



  •                 outputCollector, Reporter reporter) throws IOException {



  •             String ip = text.toString().split(" ", 5)[0];



  •             outputCollector.collect(new Text(ip), new Text("1"));



  •         }



  •     }







  •     public static class IpTopKReducer1 extends MapReduceBase implements Reducer<Text, Text, Text, Text> {



  •         @Override



  •         public void reduce(Text key, Iterator<Text> iterator, OutputCollector<Text, Text>



  •                 outputCollector, Reporter reporter) throws IOException {



  •             long sum = 0;



  •             while(iterator.hasNext()){



  •                 sum = sum + Long.parseLong(iterator.next().toString());



  •             }



  •             outputCollector.collect(new Text(key), new Text(String.valueOf(sum)));



  •             /**



  •              * ip1 count



  •              * ip2 count



  •              * ip3 count



  •              */



  •         }



  •     }











  •     public static class IpTopKMapper2 extends MapReduceBase implements Mapper<LongWritable, Text, LongWritable, Text> {



  •         @Override



  •         public void map(LongWritable longWritable, Text text, OutputCollector<LongWritable, Text>



  •                 outputCollector, Reporter reporter) throws IOException {



  •             String [] ks = text.toString().split("\t");



  •             /**



  •              * ks[0] , ip



  •              * ks[1], count



  •              */



  •             outputCollector.collect(new LongWritable(Long.parseLong(ks[1])), new Text(ks[0]));



  •         }



  •     }







  •     public static class IpTopKReducer2 extends MapReduceBase implements Reducer<LongWritable, Text, LongWritable, Text> {



  •         @Override



  •         public void reduce(LongWritable key, Iterator<Text> iterator, OutputCollector<LongWritable, Text>



  •                 outputCollector, Reporter reporter) throws IOException {







  •             while(iterator.hasNext()){



  •                 outputCollector.collect(key, iterator.next());



  •             }



  •         }



  •     }







  •     public static void main(String [] args) throws IOException {



  •         System.out.println(args.length);



  •         if(args.length < 2){



  •             System.out.println("args not right!");



  •             return ;



  •         }







  •         JobConf conf = new JobConf(IpTopK.class);



  •         conf.set("mapred.jar","tt.jar");



  •         //set output key class



  •         conf.setOutputKeyClass(Text.class);



  •         conf.setOutputValueClass(Text.class);







  •         //set mapper & reducer class



  •         conf.setMapperClass(IpTopKMapper1.class);



  •         conf.setCombinerClass(IpTopKReducer1.class);



  •         conf.setReducerClass(IpTopKReducer1.class);







  •         // set format



  •         conf.setInputFormat(TextInputFormat.class);



  •         conf.setOutputFormat(TextOutputFormat.class);







  •         String inputDir = args[0];



  •         String outputDir = args[1];







  •         // FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/");



  •         FileInputFormat.setInputPaths(conf, inputDir);



  •         FileOutputFormat.setOutputPath(conf, new Path(outputDir));







  •         boolean flag = JobClient.runJob(conf).isSuccessful();







  •         if(flag){



  •             System.out.println("run job-1 successful");



  •             JobConf conf1 = new JobConf(IpTopK.class);



  •             conf1.set("mapred.jar","tt.jar");



  •             //set output key class



  •             conf1.setOutputKeyClass(LongWritable.class);



  •             conf1.setOutputValueClass(Text.class);



  •             //set mapper & reducer class



  •             conf1.setMapperClass(IpTopKMapper2.class);



  •             conf1.setReducerClass(IpTopKReducer2.class);



  •             // set format



  •             conf1.setInputFormat(TextInputFormat.class);



  •             conf1.setOutputFormat(TextOutputFormat.class);



  •             conf1.setNumReduceTasks(1);







  •             // FileInputFormat.setInputPaths(conf, "/user/hadoop/rongxin/locationinput/");



  •             FileInputFormat.setInputPaths(conf1, outputDir);



  •             FileOutputFormat.setOutputPath(conf1, new Path(outputDir + "-2"));



  •             boolean flag1 = JobClient.runJob(conf1).isSuccessful();



  •             if(flag1){



  •                 System.out.println("run job-2 successful !!");



  •             }



  •         }



  •     }



  • }


注意:这个是hadoop1版本的代码,并且该代码执行了两个mapreduce任务


在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac IpTopK.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf tt.jar IpTopK*class(这个tt.jar还必须得和代码中的相对应)
[hadoop@h71 q1]$ hadoop jar tt.jar IpTopK /input/ip.txt /output


[hadoop@h71 q1]$ hadoop fs -lsr /output
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 16:07 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         56 2017-03-18 16:07 /output/part-00000
[hadoop@h71 q1]$ hadoop fs -lsr /output-2
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 16:07 /output-2/_SUCCESS
-rw-r--r--   2 hadoop supergroup         56 2017-03-18 16:07 /output-2/part-00000
[hadoop@h71 q1]$ hadoop fs -cat /output/part-00000
192.168.1.1     7
192.168.2.2     1
192.168.3.3     3
192.168.5.5     1
[hadoop@h71 q1]$ hadoop fs -cat /output-2/part-00000
1       192.168.5.5
1       192.168.2.2
3       192.168.3.3
7       192.168.1.1


情况2:降序
[hadoop@h71 q1]$ vi test.txt
a 5
b 4
c 74
d 78
e 1
r 64
f 4
注意:分隔符/t(Tab键)或者空格都可以
[hadoop@h71 q1]$ hadoop fs -put test.txt /input

java代码:




  • import java.io.IOException;



  • import java.util.StringTokenizer;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.io.WritableComparable;



  • import org.apache.hadoop.io.WritableComparator;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Partitioner;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;







  • public class JiangXu {







  •         public static class SortIntValueMapper extends Mapper<LongWritable, Text, IntWritable, Text> {



  •                 private final static IntWritable wordCount = new IntWritable(1);



  •                 private Text word = new Text();



  •                 public SortIntValueMapper() {



  •                         super();



  •                 }



  •                 @Override



  •                 public void map(LongWritable key, Text value, Context context)



  •                 throws IOException, InterruptedException {







  •                         StringTokenizer tokenizer = new StringTokenizer(value.toString());



  •                         while (tokenizer.hasMoreTokens()) {



  •                                 word.set(tokenizer.nextToken().trim());



  •                                 wordCount.set(Integer.valueOf(tokenizer.nextToken().trim()));



  •                                 context.write(wordCount, word);



  •                         }



  •                 }



  •         }







  •         /**



  •         * 按照key的大小来划分区间,当然,key是int值



  •         */



  •         public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {



  •             @Override



  •             public int getPartition(K key, V value, int numReduceTasks) {



  •                 /**



  •                  * int值的hashcode还是自己本身的数值



  •                  */



  •                 //这里我认为大于maxValue的就应该在第一个分区



  •                 int maxValue = 50;



  •                 int keySection = 0;



  •                 // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0



  •                 if (numReduceTasks > 1 && key.hashCode() < maxValue) {



  •                     int sectionValue = maxValue / (numReduceTasks - 1);



  •                     int count = 0;



  •                     while ((key.hashCode() - sectionValue * count) > sectionValue) {



  •                         count++;



  •                     }



  •                     keySection = numReduceTasks - 1 - count;



  •                 }



  •                 return keySection;



  •             }



  •         }







  •         /**



  •         * int的key按照降序排列



  •         */



  •         public static class IntKeyDescComparator extends WritableComparator {



  •             protected IntKeyDescComparator() {



  •                 super(IntWritable.class, true);



  •             }



  •             @Override



  •             public int compare(WritableComparable a, WritableComparable b) {



  •                 return -super.compare(a, b);



  •             }



  •         }







  •         /**



  •         * 把key和value颠倒过来输出



  •         */



  •         public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {



  •             private Text result = new Text();



  •             @Override



  •             public void reduce(IntWritable key, Iterable<Text> values, Context context)



  •                     throws IOException, InterruptedException {



  •                 for (Text val : values) {



  •                     result.set(val.toString());



  •                     context.write(result, key);



  •                 }



  •             }



  •         }







  •     public static void main(String [] args) throws Exception {



  •         /**



  •          * 这里是map输出的key和value类型



  •          */



  •                    Configuration conf = new Configuration();



  •                    Job job = new Job(conf, "word count");



  •                    job.setJarByClass(JiangXu.class);



  •                    job.setMapperClass(SortIntValueMapper.class);



  •                    job.setSortComparatorClass(IntKeyDescComparator.class);



  •                    job.setPartitionerClass(KeySectionPartitioner.class);



  •                    job.setReducerClass(SortIntValueReduce.class);



  •                    job.setOutputKeyClass(IntWritable.class);



  •                    job.setOutputValueClass(Text.class);



  •                    /**



  •         *这里可以放输入目录数组,也就是可以把上一个job所有的结果都放进去



  •         **/



  •                    FileInputFormat.setInputPaths(job, new Path(args[0]));



  •                    FileOutputFormat.setOutputPath(job, new Path(args[1]));



  •                    System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac JiangXu.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar JiangXu*class
[hadoop@h71 q1]$ hadoop jar xx.jar JiangXu /input/test.txt /output


[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
d       78
c       74
r       64
a       5
f       4
b       4
e       1


情况3:改进型的WordCount(按词频倒排),官网示例WordCount只统计出单词出现的次数,并未按词频做倒排,下面的代码示例实现了该功能
来自:http://www.cnblogs.com/yjmyzz/p/hadoop-mapreduce-2-sample.html
原理: 依然用到了cleanup,此外为了实现排序,采用了TreeMap这种内置了key排序的数据结构.
这里为了展示更直观,选用了电影<超能陆战队>主题曲的第一段歌词做为输入:
[hadoop@h71 q1]$ vi test.txt
They say we are what we are
But we do not have to be
I am  bad behavior but I do it in the best way
I will be the watcher
Of the eternal flame
I will be the guard dog
of all your fever dreams
[hadoop@h71 q1]$ hadoop fs -put test.txt /input


java代码:




  • import java.io.IOException;



  • import java.util.Comparator;



  • import java.util.StringTokenizer;



  • import java.util.TreeMap;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.util.GenericOptionsParser;







  • public class WordCount2 {



  •     public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {



  •         private final static IntWritable one = new IntWritable(1);



  •         private Text word = new Text();



  •         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {



  •             StringTokenizer itr = new StringTokenizer(value.toString());



  •             while (itr.hasMoreTokens()) {



  •                 word.set(itr.nextToken());



  •                 context.write(word, one);



  •             }



  •         }



  •     }







  •     public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {



  •         //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排



  •         private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {



  •             @Override



  •             public int compare(Integer x, Integer y) {



  •                 return y.compareTo(x);



  •             }



  •         });



  •         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {



  •             //reduce后的结果放入treeMap,而不是向context中记入结果



  •             int sum = 0;



  •             for (IntWritable val : values) {



  •                 sum += val.get();



  •             }



  •             if (treeMap.containsKey(sum)){



  •                 String value = treeMap.get(sum) + "," + key.toString();



  •                 treeMap.put(sum,value);



  •             }



  •             else {



  •                 treeMap.put(sum, key.toString());



  •             }



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             //将treeMap中的结果,按value-key顺序写入contex中



  •             for (Integer key : treeMap.keySet()) {



  •                 context.write(new Text(treeMap.get(key)), new IntWritable(key));



  •             }



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();



  •         if (otherArgs.length < 2) {



  •             System.err.println("Usage: wordcount2 <in> [<in>...] <out>");



  •             System.exit(2);



  •         }







  •         Job job = Job.getInstance(conf, "word count2");



  •         job.setJarByClass(WordCount2.class);



  •         job.setMapperClass(TokenizerMapper.class);



  •         job.setCombinerClass(IntSumReducer.class);



  •         job.setReducerClass(IntSumReducer.class);



  •         job.setOutputKeyClass(Text.class);



  •         job.setOutputValueClass(IntWritable.class);



  •         for (int i = 0; i < otherArgs.length - 1; ++i) {



  •             FileInputFormat.addInputPath(job, new Path(otherArgs));



  •         }



  •         FileOutputFormat.setOutputPath(job,



  •                 new Path(otherArgs[otherArgs.length - 1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac WordCount2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar WordCount2*class
[hadoop@h71 q1]$ hadoop jar xx.jar WordCount2 /input/test.txt /output


[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
I,the   4
be,we   3
are,do,will     2
But,Of,They,all,am,bad,behavior,best,but,dog,dreams,eternal,fever,flame,guard,have,in,it,not,of,say,to,watcher,way,what,your    1


情况4:自定义排序

对给出的两列数据首先按照第一列升序排列,当第一列相同时,第二列升序排列

如果利用mapreduce过程的自动排序,只能实现根据第一列排序,现在需要自定义一个继承自WritableComparable接口的类,用该类作为key,就可以利用mapreduce过程的自动排序了。


数据格式:
[hadoop@h71 q1]$ vi haha.txt
7 3
7 5
7 1
5 9
5 6
1 7


Java代码:




  • import java.io.DataInput;  



  • import java.io.DataOutput;  



  • import java.io.IOException;  



  • import java.net.URI;  







  • import org.apache.hadoop.conf.Configuration;  



  • import org.apache.hadoop.fs.FileSystem;  



  • import org.apache.hadoop.fs.Path;  



  • import org.apache.hadoop.io.LongWritable;  



  • import org.apache.hadoop.io.Text;  



  • import org.apache.hadoop.io.WritableComparable;  



  • import org.apache.hadoop.mapreduce.Job;  



  • import org.apache.hadoop.mapreduce.Mapper;  



  • import org.apache.hadoop.mapreduce.Reducer;  



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  



  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  



  • import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;  







  • public class ZiDingYi {



  •     private static final String INPUT_PATH = "hdfs://h71:9000/in";  



  •     private static final String OUT_PATH = "hdfs://h71:9000/out";  







  •     public static void main(String[] args) throws Exception {  



  •         Configuration conf = new Configuration();  



  •         FileSystem fileSystem = FileSystem.get(new URI(INPUT_PATH), conf);  



  •         if(fileSystem.exists(new Path(OUT_PATH))){  



  •             fileSystem.delete(new Path(OUT_PATH),true);  



  •         }  



  •         Job job = new Job(conf,ZiDingYi.class.getSimpleName());  



  •         FileInputFormat.setInputPaths(job, INPUT_PATH);  



  •         job.setJarByClass(ZiDingYi.class);



  • //上面这行必须加,不然会报错:Caused by: java.lang.ClassNotFoundException: Class ZiDingYi$MyMapper not found







  •         //指定哪个类用来格式化输入文件  



  •         job.setInputFormatClass(TextInputFormat.class);  



  •         //指定自定义的Mapper类  



  •         job.setMapperClass(MyMapper.class);  



  •         //指定输出<k2,v2>的类型  



  •         job.setMapOutputKeyClass(newK2.class);  



  •         job.setMapOutputValueClass(LongWritable.class);  







  •         //指定分区类  



  •         job.setPartitionerClass(HashPartitioner.class);  



  •         job.setNumReduceTasks(1);  







  •         //指定自定义的reduce类  



  •         job.setReducerClass(MyReducer.class);  



  •         //指定输出<k3,v3>的类型  



  •         job.setOutputKeyClass(LongWritable.class);  



  •         job.setOutputValueClass(LongWritable.class);  







  •         FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));  



  •         //设定输出文件的格式化类  



  •         job.setOutputFormatClass(TextOutputFormat.class);  







  •         //把代码提交给JobTracker执行  



  •         job.waitForCompletion(true);  



  •     }  







  •     static class MyMapper extends Mapper<LongWritable,Text, newK2,LongWritable>{  



  •         @Override



  •         protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •             String[] splied = value.toString().split(" ");



  •             newK2 k2 = new newK2(Long.parseLong(splied[0]),Long.parseLong(splied[1]));



  •             final LongWritable v2 = new LongWritable(Long.parseLong(splied[1]));



  •             context.write(k2, v2);



  •         }



  •     }







  •     static class MyReducer extends Reducer<newK2, LongWritable, LongWritable, LongWritable>{  



  •         @Override  



  •         protected void reduce(ZiDingYi.newK2 key, Iterable<LongWritable> value, Context context) throws IOException, InterruptedException {  



  •             context.write(new LongWritable(key.first), new LongWritable(key.second));  



  •         }  



  •     }  







  •     static class newK2 implements WritableComparable<newK2>{  



  •         Long first;  



  •         Long second;  



  •         public newK2(long first, long second) {  



  •             this.first = first;  



  •             this.second = second;  



  •         }  







  •         public newK2() {  



  •         }  //这个方法还不能删掉,否则报错:Caused by: java.lang.RuntimeException: java.lang.NoSuchMethodException: ZiDingYi$newK2.<init>()







  •         @Override  



  •         public void readFields(DataInput input) throws IOException {  



  •             this.first = input.readLong();  



  •             this.second = input.readLong();  



  •         }  







  •         @Override  



  •         public void write(DataOutput out) throws IOException {  



  •             out.writeLong(first);  



  •             out.writeLong(second);  



  •         }   



  •         //当第一列不同时,升序;当第一列相同时,第二列升序







  •         @Override  



  •         public int compareTo(newK2 o) {  



  •             long temp = this.first -o.first;  



  •             if(temp!=0){  



  •                 return (int)temp;  



  •             }  



  •             return (int)(this.second -o.second);  



  •         }  







  •         @Override  



  •         public int hashCode() {  



  •             return this.first.hashCode()+this.second.hashCode();  



  •         }  







  •         @Override  



  •         public boolean equals(Object obj) {  



  •             if(!(obj instanceof newK2)){  



  •                 return false;  



  •             }  



  •             newK2 k2 = (newK2)obj;  



  •             return(this.first == k2.first)&&(this.second == k2.second);  



  •         }  



  •     }  



  • }


注意:KeyValue 中的first second属性必须写成Long类型,而不是long,否则 this.first.hashCode()不成立。对任何实现WritableComparable的类都能进行排序,这可以一些复杂的数据,只要把他们封装成实现了WritableComparable的类作为key就可以了


运行程序后查看结果:

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
1       7
5       6
5       9
7       1
7       3
7       5



【转载】 https://blog.csdn.net/m0_37739193/article/details/76090816


3 个回复

正序浏览
奈斯,加油加油
回复 使用道具 举报
回复 使用道具 举报
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马