A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

一、从Hbase表1中读取数据再把统计结果存到表2

在Hbase中建立相应的表1:



  • create 'hello','cf'



  • put 'hello','1','cf:hui','hello world'



  • put 'hello','2','cf:hui','hello hadoop'



  • put 'hello','3','cf:hui','hello hive'



  • put 'hello','4','cf:hui','hello hadoop'



  • put 'hello','5','cf:hui','hello world'



  • put 'hello','6','cf:hui','hello world'



java代码:


  • import java.io.IOException;



  • import java.util.Iterator;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.HColumnDescriptor;



  • import org.apache.hadoop.hbase.HTableDescriptor;



  • import org.apache.hadoop.hbase.client.HBaseAdmin;



  • import org.apache.hadoop.hbase.client.Put;



  • import org.apache.hadoop.hbase.client.Result;



  • import org.apache.hadoop.hbase.client.Scan;



  • import org.apache.hadoop.hbase.io.ImmutableBytesWritable;



  • import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;



  • import org.apache.hadoop.hbase.mapreduce.TableMapper;



  • import org.apache.hadoop.hbase.mapreduce.TableReducer;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;







  • public class HBaseToHbase {  



  •         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  



  •                 String hbaseTableName1 = "hello";  



  •                 String hbaseTableName2 = "mytb2";  







  •                 prepareTB2(hbaseTableName2);  







  •                 Configuration conf = new Configuration();  







  •                 Job job = Job.getInstance(conf);  



  •                 job.setJarByClass(HBaseToHbase.class);  



  •                 job.setJobName("mrreadwritehbase");  







  •                 Scan scan = new Scan();  



  •                 scan.setCaching(500);  



  •                 scan.setCacheBlocks(false);  







  •                 TableMapReduceUtil.initTableMapperJob(hbaseTableName1, scan, doMapper.class, Text.class, IntWritable.class, job);  



  •                 TableMapReduceUtil.initTableReducerJob(hbaseTableName2, doReducer.class, job);  



  •                 System.exit(job.waitForCompletion(true) ? 1 : 0);  



  •         }  







  •         public static class doMapper extends TableMapper<Text, IntWritable>{  



  •                 private final static IntWritable one = new IntWritable(1);  



  •                 @Override  



  •                 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {  



  •                         String rowValue = Bytes.toString(value.list().get(0).getValue());



  •                         context.write(new Text(rowValue), one);  



  •                 }  



  •         }  







  •         public static class doReducer extends TableReducer<Text, IntWritable, NullWritable>{  



  •                 @Override  



  •                 protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {  



  •                         System.out.println(key.toString());  



  •                         int sum = 0;  



  •                         Iterator<IntWritable> haha = values.iterator();  



  •                         while (haha.hasNext()) {  



  •                                 sum += haha.next().get();  



  •                         }  



  •                         Put put = new Put(Bytes.toBytes(key.toString()));  



  •                         put.add(Bytes.toBytes("mycolumnfamily"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));               



  •                         context.write(NullWritable.get(), put);  



  •                 }  



  •         }  







  •         public static void prepareTB2(String hbaseTableName) throws IOException{  



  •                 HTableDescriptor tableDesc = new HTableDescriptor(hbaseTableName);  



  •                 HColumnDescriptor columnDesc = new HColumnDescriptor("mycolumnfamily");  



  •                 tableDesc.addFamily(columnDesc);  



  •                 Configuration  cfg = HBaseConfiguration.create();  



  •                 HBaseAdmin admin = new HBaseAdmin(cfg);  



  •                 if (admin.tableExists(hbaseTableName)) {  



  •                         System.out.println("Table exists,trying drop and create!");  



  •                         admin.disableTable(hbaseTableName);  



  •                         admin.deleteTable(hbaseTableName);  



  •                         admin.createTable(tableDesc);  



  •                 } else {  



  •                         System.out.println("create table: "+ hbaseTableName);  



  •                         admin.createTable(tableDesc);  



  •                 }  



  •         }  



  • }  



在Linux中执行该代码:


  • [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HBaseToHbase.java



  • [hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HBaseToHbase*class



  • [hadoop@h71 q1]$ hadoop jar xx.jar HBaseToHbase



查看mytb2表:


  • hbase(main):009:0> scan 'mytb2'



  • ROW                                                          COLUMN+CELL                                                                                                                                                                     



  • hello hadoop                                                column=mycolumnfamily:count, timestamp=1489817182454, value=2                                                                                                                  



  • hello hive                                                  column=mycolumnfamily:count, timestamp=1489817182454, value=1                                                                                                                  



  • hello world                                                 column=mycolumnfamily:count, timestamp=1489817182454, value=3                                                                                                                  



  • 3 row(s) in 0.0260 seconds



二、从Hbase表1中读取数据再把结果存Hdfs中

1.将表1的内容不统计输出:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.client.Result;



  • import org.apache.hadoop.hbase.client.Scan;



  • import org.apache.hadoop.hbase.io.ImmutableBytesWritable;



  • import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;



  • import org.apache.hadoop.hbase.mapreduce.TableMapper;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.io.Writable;



  • import org.apache.hadoop.io.WritableComparable;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;







  • public class HbaseToHdfs {  



  •         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  







  •           String tablename = "hello";



  •           Configuration conf = HBaseConfiguration.create();



  •           conf.set("hbase.zookeeper.quorum", "h71");



  •           Job job = new Job(conf, "WordCountHbaseReader");



  •           job.setJarByClass(HbaseToHdfs.class);



  •           Scan scan = new Scan();



  •           TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, Text.class, job);



  •           job.setReducerClass(WordCountHbaseReaderReduce.class);



  •           FileOutputFormat.setOutputPath(job, new Path(args[0]));



  •           MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);



  •           System.exit(job.waitForCompletion(true) ? 0 : 1);



  •         }  







  •         public static class doMapper extends TableMapper<Text, Text>{  



  •                 @Override  



  •                 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {  



  •                         String rowValue = Bytes.toString(value.list().get(0).getValue());



  •                         context.write(new Text(rowValue), new Text("one"));  



  •                 }  



  •         }  







  •         public static class WordCountHbaseReaderReduce extends Reducer<Text,Text,Text,NullWritable>{



  •                 private Text result = new Text();



  •                 @Override



  •           protected void reduce(Text key, Iterable<Text> values,Context context) throws IOException, InterruptedException {



  •                         for(Text val:values){



  •                                 result.set(val);



  •                                 context.write(key, NullWritable.get());



  •                         }



  •                 }



  •         }



  • }  



在Linux中执行该代码:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HbaseToHdfs.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HbaseToHdfs*class
[hadoop@h71 q1]$ hadoop jar xx.jar HbaseToHdfs /output
注意:/output目录不能存在,如果存在就删除掉

[hadoop@h71 q1]$ hadoop fs -ls /output
Found 2 items
-rw-r--r--   2 hadoop supergroup          0 2017-03-18 14:28 /output/_SUCCESS
-rw-r--r--   2 hadoop supergroup         73 2017-03-18 14:28 /output/part-r-00000
[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hello hadoop
hello hadoop
hello hive
hello world
hello world
hello world


2.将表1的内容统计输出:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.client.Result;



  • import org.apache.hadoop.hbase.client.Scan;



  • import org.apache.hadoop.hbase.io.ImmutableBytesWritable;



  • import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;



  • import org.apache.hadoop.hbase.mapreduce.TableMapper;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.io.Writable;



  • import org.apache.hadoop.io.WritableComparable;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;



  • import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;







  • public class HbaseToHdfs1 {  



  •         public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  







  •           String tablename = "hello";



  •           Configuration conf = HBaseConfiguration.create();



  •           conf.set("hbase.zookeeper.quorum", "h71");



  •           Job job = new Job(conf, "WordCountHbaseReader");



  •           job.setJarByClass(HbaseToHdfs1.class);



  •           Scan scan = new Scan();



  •           TableMapReduceUtil.initTableMapperJob(tablename,scan,doMapper.class, Text.class, IntWritable.class, job);



  •           job.setReducerClass(WordCountHbaseReaderReduce.class);



  •           FileOutputFormat.setOutputPath(job, new Path(args[0]));



  •           MultipleOutputs.addNamedOutput(job, "hdfs", TextOutputFormat.class, WritableComparable.class, Writable.class);



  •           System.exit(job.waitForCompletion(true) ? 0 : 1);



  •         }  







  •         public static class doMapper extends TableMapper<Text, IntWritable>{  



  •                 private final static IntWritable one = new IntWritable(1);



  •                 private Text word = new Text();



  •                 @Override  



  •                 protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {



  •                 /*



  •                 String rowValue = Bytes.toString(value.list().get(0).getValue());



  •                           context.write(new Text(rowValue), one);



  •                   */



  •                   String[] rowValue = Bytes.toString(value.list().get(0).getValue()).split(" ");



  •                          for (String str: rowValue){



  •                          word.set(str);



  •                          context.write(word,one);



  •                         }



  •                 }  



  •         }  







  •         public static class WordCountHbaseReaderReduce extends Reducer<Text,IntWritable,Text,IntWritable>{



  •           @Override



  •                 public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {



  •                   int total=0;



  •                           for(IntWritable val:values){



  •                             total++;



  •                     }



  •                   context.write(key, new IntWritable(total));



  •           }



  •         }



  • }  



[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hadoop  2
hello   6
hive    1
world   3


三、读取Hdfs文件将统计结果存入到Hbase表中

创建文件并上传到Hdfs中:



  • [hadoop@h71 q1]$ vi hello.txt



  • hello world



  • hello hadoop



  • hello hive



  • hello hadoop



  • hello world



  • hello world



  • [hadoop@h71 q1]$ hadoop fs -mkdir /input



  • [hadoop@h71 q1]$ hadoop fs -put hello.txt /input



java代码:


  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.hbase.HBaseConfiguration;



  • import org.apache.hadoop.hbase.HColumnDescriptor;



  • import org.apache.hadoop.hbase.HTableDescriptor;



  • import org.apache.hadoop.hbase.client.HBaseAdmin;



  • import org.apache.hadoop.hbase.client.Put;



  • import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;



  • import org.apache.hadoop.hbase.mapreduce.TableReducer;



  • import org.apache.hadoop.hbase.util.Bytes;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.LongWritable;



  • import org.apache.hadoop.io.NullWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;







  • public class HdfsToHBase {







  •         public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {



  •                 private IntWritable i = new IntWritable(1);



  •                 @Override



  •                 public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {



  •                         String s[] = value.toString().trim().split("/n");



  •                         for (String m : s) {



  •                                 context.write(new Text(m), i);



  •                         }



  •                 }



  •         }







  •         public static class Reduce extends TableReducer<Text, IntWritable, NullWritable> {



  •                 @Override



  •           public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {



  •                         int sum = 0;



  •                         for (IntWritable i : values) {



  •                                 sum += i.get();



  •                         }



  •                         Put put = new Put(Bytes.toBytes(key.toString()));



  •                         // 列族为cf,列为count,列值为数目



  •                         put.add(Bytes.toBytes("cf"), Bytes.toBytes("count"), Bytes.toBytes(String.valueOf(sum)));



  •                         context.write(NullWritable.get(), put);



  •                 }



  •         }







  •         public static void createHBaseTable(String tableName) throws IOException {



  •                 HTableDescriptor htd = new HTableDescriptor(tableName);



  •                 HColumnDescriptor col = new HColumnDescriptor("cf");



  •                 htd.addFamily(col);



  •                 Configuration conf = HBaseConfiguration.create();



  •                 conf.set("hbase.zookeeper.quorum", "h71");



  •                 HBaseAdmin admin = new HBaseAdmin(conf);



  •                 if (admin.tableExists(tableName)) {



  •                         System.out.println("table exists, trying to recreate table......");



  •                         admin.disableTable(tableName);



  •                         admin.deleteTable(tableName);



  •                 }



  •                 System.out.println("create new table:" + tableName);



  •                 admin.createTable(htd);



  •         }







  •         public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {



  •                 //将结果存入hbase的表名



  •                 String tableName = "mytb2";



  •                 Configuration conf = new Configuration();



  •                 conf.set(TableOutputFormat.OUTPUT_TABLE, tableName);



  •                 createHBaseTable(tableName);



  •                 String input = args[0];



  •                 Job job = new Job(conf, "WordCount table with " + input);



  •                 job.setJarByClass(HdfsToHBase.class);



  •                 job.setNumReduceTasks(3);



  •                 job.setMapperClass(Map.class);



  •                 job.setReducerClass(Reduce.class);



  •                 job.setMapOutputKeyClass(Text.class);



  •                 job.setMapOutputValueClass(IntWritable.class);



  •                 job.setInputFormatClass(TextInputFormat.class);



  •                 job.setOutputFormatClass(TableOutputFormat.class);



  •                 FileInputFormat.addInputPath(job, new Path(input));



  • //        FileInputFormat.setInputPaths(job, new Path(input));        //这种方法也可以



  •                 System.exit(job.waitForCompletion(true) ? 0 : 1);



  •         }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHBase.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHBase*class
[hadoop@h71 q1]$ hadoop jar xx.jar HdfsToHBase /input/hello.txt





  • hbase(main):011:0> scan 'mytb2'



  • ROW                                                          COLUMN+CELL                                                                                                                                                                     



  • hello hadoop                                                column=cf:count, timestamp=1489819702236, value=2                                                                                                                              



  • hello hive                                                  column=cf:count, timestamp=1489819702236, value=1                                                                                                                              



  • hello world                                                 column=cf:count, timestamp=1489819704448, value=3                                                                                                                              



  • 3 row(s) in 0.3260 seconds



四、从Hdfs到Hdfs(其实就是mapreduce的经典例子wordcount)

java代码:



  • import java.io.IOException;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.IntWritable;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;







  • public class HdfsToHdfs{







  •         public static class WordCountMapper extends Mapper<Object,Text,Text,IntWritable>{



  •                 private final static IntWritable one = new IntWritable(1);



  •                 private Text word = new Text();



  •                 @Override



  •                 public void map(Object key,Text value,Context context) throws IOException, InterruptedException {



  •                   String[] words = value.toString().split(" ");



  •                   for (String str: words){



  •                                 word.set(str);



  •                                 context.write(word,one);



  •                   }



  •                 }



  •         }     







  •         public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {



  •           @Override



  •           public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {



  •                         int total=0;



  •                         for (IntWritable val : values){



  •                           total++;



  •                         }



  •                         context.write(key, new IntWritable(total));



  •                 }   



  •         }







  •         public static void main (String[] args) throws Exception{



  •           Configuration conf = new Configuration();







  •           Job job = new Job(conf, "word count");



  •           job.setJarByClass(HdfsToHdfs.class);



  •           job.setMapperClass(WordCountMapper.class);



  •           job.setReducerClass(WordCountReducer.class);



  •           job.setOutputKeyClass(Text.class);



  •           job.setOutputValueClass(IntWritable.class);







  •           FileInputFormat.setInputPaths(job, new Path(args[0]));



  •           FileOutputFormat.setOutputPath(job, new Path(args[1]));



  •           System.exit(job.waitForCompletion(true) ? 0 : 1);



  •         }



  • }



[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac HdfsToHdfs.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar HdfsToHdfs*class
[hadoop@h71 q1]$ hadoop jar xx.jar HdfsToHdfs /input/hello.txt /output


[hadoop@h71 q1]$ hadoop fs -cat /output/part-r-00000
hadoop  2
hello   6
hive    1
world   3


说明:我这个wordcount例子是hadoop2版本的,我的另一篇文章http://blog.csdn.net/m0_37739193/article/details/71132652里的是hadoop1版本的例子,在hadoop0.20.0及以后同时包含了两个版本的的API,所以两个版本的代码都能运行

Hadoop MapReduce新旧API区别:
        Hadoop的版本0.20.0包含有一个新的java MapReduce API,有时也称为"上下文对象"(context object),旨在使API在今后更容易扩展。新的API 在类型上不兼容先前的API,所以,需要重写以前的应用程序才能使新的API发挥作用。
        新的API倾向于使用抽象类,而不是接口,因为这更容易扩展。例如,可以无需修改类的实现而在抽象类中添加一个方法(即用默认的实现)。在新的API中, mapper和reducer现在都是抽象类。

--接口,严格的“协议约束”,只有方法声明而没有方法实现,要求所有实现类(抽象类除外)必须实现接口中的每个方法。

--抽象类,较宽松的“约束协议”,可为某些方法提供默认实现,而继承类则可选择是否重新实现这些方法。故而抽象类在类衍化方面更有优势,即具有良好的向后兼容性。
        新的API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的API依旧放在org.apache.hadoop.mapred中。
        新的API充分使用上下文对象,使用户代码能与MapReduce系统通信。例如,MapContext 基本具备了JobConf、OutputCollector和Reporter的功能。
        新的API同时支持"推"(push)和"拉"(pull)式的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方法中拉出。对reducer来说是一样的。"拉"式处理数据的好处是可以实现数据的批量处理,而非逐条记录地处理。
        新增的API实现了配置的统一。旧API通过一个特殊的JobConf对象配置作业,该对象是Hadoop配置对象的一个扩展。在新的API中,我们丢弃这种区分,所有作业的配置均通过Configuration来完成。
        新API中作业控制由Job类实现,而非JobClient类,新API中删除了JobClient类。
        输出文件的命名方式稍有不同。map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn表示分块序号,为整数,且从0开始算。
        将旧API写的Mapper和Reducer类转换为新API时,记住将map()和reduce()的签名转换为新形式。如果只是将类的继承修改为对新的Mapper和Reducer类的继承,编译的时候也不会报错或显示警告信息,因为新的Mapper和Reducer类同样也提供了等价的map()和reduce()函数。但是,自己写的mapper或reducer代码是不会被调用的,这会导致难以诊断的错误。



【转载】https://blog.csdn.net/m0_37739193/article/details/76053636


3 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
奈斯,加油加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马