A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

方法一:

在Hbase中建立相应的表1:




  • create 'hello','cf'



  • put 'hello','1','cf:hui','hello world'



  • put 'hello','2','cf:hui','hello hadoop'



  • put 'hello','3','cf:hui','hello hive'



  • put 'hello','4','cf:hui','hello hadoop'



  • put 'hello','5','cf:hui','hello world'



  • put 'hello','6','cf:hui','hello world'



  • put 'hello','7','cf:hui','hbase hive'



java代码:




  • import java.io.IOException;



  • import java.util.Comparator;



  • import java.util.TreeMap;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.client.Result;



  • import org.apache.hadoop.hbase.client.Scan;



  • import org.apache.hadoop.hbase.io.ImmutableBytesWritable;



  • import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;



  • import org.apache.hadoop.hbase.mapreduce.TableMapper;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.io.Writable;



  • import org.apache.hadoop.io.WritableComparable;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;







  • public class HbaseTopJiang1 {  



  •     public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  



  •             String tablename = "hello";



  •             Configuration conf = HBaseConfiguration.create();



  •             conf.set("hbase.zookeeper.quorum", "h71");



  •             Job job = new Job(conf, "WordCountHbaseReader");



  •             job.setJarByClass(HbaseTopJiang1.class);



  •             Scan scan = new Scan();



  •             TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);



  •             job.setReducerClass(WordCountHbaseReaderReduce.class);



  •             FileOutputFormat.setOutputPath(job, new Path(args[0]));



  •             MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);



  •             System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }  







  •     public static class doMapper extends TableMapper<Text, IntWritable>{  



  •         private final static IntWritable one = new IntWritable(1);



  •         private Text word = new Text();



  •         @Override  



  •         protected void map(ImmutableBytesWritable key, Result value,  



  •                 Context context) throws IOException, InterruptedException {



  •                 /*不进行分隔,将value整行全部获取



  •                         String rowValue = Bytes.toString(value.list().get(0).getValue());



  •                   context.write(new Text(rowValue), one);



  •                 */



  •                 String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");



  •                  for (String str: rowValue){



  •                        word.set(str);



  •                        context.write(word,one);



  •                 }



  •         }  



  •     }  







  •     public static final int K = 3;



  •     public static class WordCountHbaseReaderReduce extends Reducer<Text, IntWritable, Text, IntWritable> {



  •         //定义treeMap来保持统计结果,由于treeMap是按key升序排列的,这里要人为指定Comparator以实现倒排



  •         private TreeMap<Integer, String> treeMap = new TreeMap<Integer, String>(new Comparator<Integer>() {



  •             @Override



  •             public int compare(Integer x, Integer y) {



  •                 return y.compareTo(x);



  •             }



  •         });



  •         public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {



  •             //reduce后的结果放入treeMap,而不是向context中记入结果



  •             int sum = 0;



  •             for (IntWritable val : values) {



  •                 sum += val.get();



  •             }



  •             if (treeMap.containsKey(sum)){



  •                 String value = treeMap.get(sum) + "," + key.toString();



  •                 treeMap.put(sum,value);



  •             }else {



  •                 treeMap.put(sum, key.toString());



  •             }



  •                         if(treeMap.size() > K) {



  •                                 treeMap.remove(treeMap.lastKey());



  •                         }  



  •         }



  •         protected void cleanup(Context context) throws IOException, InterruptedException {



  •             //将treeMap中的结果,按value-key顺序写入contex中



  •             for (Integer key : treeMap.keySet()) {



  •                 context.write(new Text(treeMap.get(key)), new IntWritable(key));



  •             }



  •         }



  •     }



  • }  



在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang1.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang1*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang1 /out

[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hello   6
world   3
hadoop,hive     2


方法二:



  • truncate 'hello'



  • put 'hello','1','cf:hui','hello world        world'



  • put 'hello','2','cf:hui','hello hadoop        hadoop'



  • put 'hello','3','cf:hui','hello hive        hive'



  • put 'hello','4','cf:hui','hello hadoop        hadoop'



  • put 'hello','5','cf:hui','hello world        world'



  • put 'hello','6','cf:hui','hello world        world'



  • put 'hello','7','cf:hui','hbase hive        hive'


注意:相同单词之间的分隔符是"/t"(Tab键),结果hbase中插入数据的时候根本就不能插入制表符,所以该方法破产,可以参考一下思想

java代码:


  • import java.io.IOException;



  • import java.util.TreeMap;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.client.Result;



  • import org.apache.hadoop.hbase.client.Scan;



  • import org.apache.hadoop.hbase.io.ImmutableBytesWritable;



  • import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;



  • import org.apache.hadoop.hbase.mapreduce.TableMapper;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.io.Writable;



  • import org.apache.hadoop.io.WritableComparable;



  • import org.apache.hadoop.io.WritableComparator;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Partitioner;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;







  • public class HbaseTopJiang2{



  •         public static class doMapper extends TableMapper<Text, IntWritable>{  



  •                 private final static IntWritable one = new IntWritable(1);



  •                 private Text word = new Text();



  •                 @Override  



  •                 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {



  •                 /*不进行分隔,将value整行全部获取



  •                         String rowValue = Bytes.toString(value.list().get(0).getValue());



  •                   context.write(new Text(rowValue), one);



  •                 */



  •                         String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");



  •                         for (String str: rowValue){



  •                                 word.set(str);



  •                                 context.write(word,one);



  •                         }



  •                 }  



  •         }   







  •         public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {



  •         @Override



  •         public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {



  •                 int total=0;



  •                 for (IntWritable val : values){



  •                         total++;



  •                 }



  •                 context.write(key, new IntWritable(total));



  •                 }   



  •         }







  •         public static final int K = 3;  



  •         /**



  •         * 把上一个mapreduce的结果的key和value颠倒,调到后就可以按照key排序了。



  •         */



  •         public static class KMap extends Mapper<LongWritable,Text,IntWritable,Text> {



  •                 TreeMap<Integer, String> map = new TreeMap<Integer, String>();



  •                 @Override



  •                 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •                         String line = value.toString();



  •                         String result[] = line.split("\t");



  •                         StringBuffer hui = null;



  •                         if(result.length > 2){        //我怕在往hbase表输入数据时带\t分隔符的,后来发现hbase中插入数据的时候根本就不能插入制表符



  •                                 for(int i=0;i<result.length-2;i++){



  •                                         hui=new StringBuffer().append(result);



  •                                 }



  •                         }else{



  •                                 hui = new StringBuffer().append(result[0]);



  •                         }



  •                         if(line.trim().length() > 0 && line.indexOf("\t") != -1) {



  •                                 String[] arr = line.split("\t", 2);



  •                                 String name = arr[0];  



  •                                 Integer num = Integer.parseInt(arr[1]);  



  •                         if (map.containsKey(num)){



  •                             String value1 = map.get(num) + "," + hui;



  •                             map.put(num,value1);



  •                         }



  •                         else {



  •                             map.put(num, hui.toString());



  •                         }



  •                                 if(map.size() > K) {



  •                                         map.remove(map.firstKey());  



  •                                 }  



  •                         }  



  •                 }  



  •                 @Override



  •                 protected void cleanup(Mapper<LongWritable, Text, IntWritable, Text>.Context context)  



  •                                 throws IOException, InterruptedException {  



  •                         for(Integer num : map.keySet()) {  



  •                                 context.write(new IntWritable(num), new Text(map.get(num)));



  •                         }  



  •                 }  



  •         }







  •         /**



  •         * 按照key的大小来划分区间,当然,key是int值



  •         */



  •         public static class KeySectionPartitioner<K, V> extends Partitioner<K, V> {



  •                 @Override



  •                 public int getPartition(K key, V value, int numReduceTasks) {



  •                         /**



  •                          * int值的hashcode还是自己本身的数值



  •                          */



  •                         //这里我认为大于maxValue的就应该在第一个分区



  •                         int maxValue = 50;



  •                         int keySection = 0;



  •                         // 只有传过来的key值大于maxValue 并且numReduceTasks比如大于1个才需要分区,否则直接返回0



  •                         if (numReduceTasks > 1 && key.hashCode() < maxValue) {



  •                                 int sectionValue = maxValue / (numReduceTasks - 1);



  •                                 int count = 0;



  •                                 while ((key.hashCode() - sectionValue * count) > sectionValue) {



  •                                         count++;



  •                                 }



  •                                 keySection = numReduceTasks - 1 - count;



  •                         }



  •                         return keySection;



  •                 }



  •         }







  •         /**



  •         * int的key按照降序排列



  •         */



  •         public static class IntKeyDescComparator extends WritableComparator {



  •                 protected IntKeyDescComparator() {



  •                         super(IntWritable.class, true);



  •                 }



  •                 @Override



  •                 public int compare(WritableComparable a, WritableComparable b) {



  •                         return -super.compare(a, b);



  •                 }



  •         }







  •         /**



  •         * 把key和value颠倒过来输出



  •         */



  •         public static class SortIntValueReduce extends Reducer<IntWritable, Text, Text, IntWritable> {



  •                 private Text result = new Text();



  •                 @Override



  •                 public void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {



  •                         for (Text val : values) {



  •                                 result.set(val.toString());



  •                                 context.write(result, key);



  •                         }



  •                 }



  •         }







  •         public static void main(String[] args) throws Exception {



  •                 String tablename = "hello";



  •                 Configuration conf = HBaseConfiguration.create();



  •                 conf.set("hbase.zookeeper.quorum", "h71");



  •                 Job job1 = new Job(conf, "WordCountHbaseReader");



  •                 job1.setJarByClass(HbaseTopJiang2.class);



  •                 Scan scan = new Scan();



  •                 TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job1);



  •                 job1.setReducerClass(WordCountReducer.class);



  •                 FileOutputFormat.setOutputPath(job1, new Path(args[0]));



  •                 MultipleOutputs.addNamedOutput(job1, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);







  •                 Job job2 = Job.getInstance(conf, "Topjiang");



  •                 job2.setJarByClass(HbaseTopJiang2.class);



  •                 job2.setMapperClass(KMap.class);



  •                 job2.setSortComparatorClass(IntKeyDescComparator.class);



  •                 job2.setPartitionerClass(KeySectionPartitioner.class);



  •                 job2.setReducerClass(SortIntValueReduce.class);



  •                 job2.setOutputKeyClass(IntWritable.class);



  •                 job2.setOutputValueClass(Text.class);



  •                 FileInputFormat.setInputPaths(job2, new Path(args[0]));



  •                 FileOutputFormat.setOutputPath(job2, new Path(args[1]));







  •                 //提交job1及job2,并等待完成



  •                 if (job1.waitForCompletion(true)) {



  •                         System.exit(job2.waitForCompletion(true) ? 0 : 1);



  •                 }



  •         }



  • }


[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseTopJiang2.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseTopJiang2*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseTopJiang2 /out /output


[hadoop@h71 q1]$ hadoop fs -ls /out
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /out/_SUCCESS
-rw-r--r--   2 hadoop supergroup         32 2017-03-18 19:02 /out/part-r-00000
[hadoop@h71 q1]$ hadoop fs -ls /output
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 19:02 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         25 2017-03-18 19:02 /output/part-r-00000


理想结果:
[hadoop@h71 q1]$ hadoop fs -cat /out/part-r-00000
hbase 1
hadoop hadoop 2
hello 6
hive hive 2
world world 3
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello 6
world world 3
hadoop hadoop,hive hive 2
(分隔符都为制表符)


我发现制表符(Tab键)从UltraEdit复制到SecureCRT正常,而从SecureCRT复制到UltraEdit则制表符会变成空格,也是醉了。。。



【转载】https://blog.csdn.net/m0_37739193/article/details/76091209


3 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
奈斯,加油加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马