A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

 找回密码
 加入黑马

QQ登录

只需一步,快速开始

前言:对两份数据data1和data2进行关键词连接是一个很通用的问题,在关系型数据库中Join是非常常见的操作,各种优化手段已经到了极致。在海量数据的环境下,不可避免的也会碰到这种类型的需求,例如在数据分析时需要从不同的数据源中获取数据。不同于传统的单机模式,在分布式存储下采用MapReduce编程模型,也有相应的处理措施和优化方法。


1.模拟数据:
[hadoop@h71 q1]$ vi mz.txt
zs 1
ls 2
ww 3
zl 2
qq 2
hh 1
[hadoop@h71 q1]$ vi jg.txt
1 beijing
2 tianjing
3 shanghai


2.将数据上传到hdfs上:
[hadoop@h71 q1]$ hadoop fs -mkdir /user/hadoop/m_in

[hadoop@h71 q1]$ hadoop fs -put mz.txt /user/hadoop/m_in
[hadoop@h71 q1]$ hadoop fs -put jg.txt /user/hadoop/m_in


3.[hadoop@h71 q1]$ vi MTjoin.java




  • import java.io.IOException;



  • import java.util.Iterator;



  • import java.util.StringTokenizer;







  • import org.apache.hadoop.conf.Configuration;



  • import org.apache.hadoop.fs.Path;



  • import org.apache.hadoop.io.Text;



  • import org.apache.hadoop.mapreduce.Job;



  • import org.apache.hadoop.mapreduce.Mapper;



  • import org.apache.hadoop.mapreduce.Reducer;



  • import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;



  • import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;



  • import org.apache.hadoop.util.GenericOptionsParser;







  • public class MTjoin {







  •     public static int time = 0;



  •     /*



  •      * 在map中先区分输入行属于左表还是右表,然后对两列值进行分割,



  •      * 保存连接列在key值,剩余列和左右表标志在value中,最后输出



  •      */



  •     public static class Map extends Mapper<Object, Text, Text, Text> {



  •         // 实现map函数



  •         public void map(Object key, Text value, Context context) throws IOException, InterruptedException {



  •             String line = value.toString();// 每行文件



  •             String relationtype = new String();// 左右表标识



  •             // 输入文件首行,不处理



  •             if (line.contains("name") == true || line.contains("addressed") == true) {



  •                 return;



  •             }



  •             // 输入的一行预处理文本



  •             StringTokenizer itr = new StringTokenizer(line);



  •             String mapkey = new String();



  •             String mapvalue = new String();



  •             int i = 0;



  •             while (itr.hasMoreTokens()) {



  •                 // 先读取一个单词



  •                 String token = itr.nextToken();



  •                 // 判断该地址ID就把存到"values[0]"



  •                 //charAt() 方法可返回指定位置的字符



  •                 if (token.charAt(0) >= '0' && token.charAt(0) <= '9') {



  •                     mapkey = token;



  •                     if (i > 0) {



  •                         relationtype = "1";



  •                     } else {



  •                         relationtype = "2";



  •                     }



  •                     //break的作用是跳出当前循环块,continue用于结束循环体中其后语句的执行



  •                     continue;



  •                 }



  •                 // 存工厂名



  •                 mapvalue += token + " ";



  •                 i++;



  •             }



  •             // 输出左右表



  •             context.write(new Text(mapkey), new Text(relationtype + "+"+ mapvalue));



  •         }



  •     }



  •     /*



  •      * reduce解析map输出,将value中数据按照左右表分别保存,



  •            * 然后求出笛卡尔积,并输出。



  •      */



  •     public static class Reduce extends Reducer<Text, Text, Text, Text> {



  •         // 实现reduce函数



  •         public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {



  •             // 输出表头



  •             if (0 == time) {



  •                 context.write(new Text("name"), new Text("address"));



  •                 time++;



  •             }



  •             int factorynum = 0;



  •             String[] factory = new String[10];



  •             int addressnum = 0;



  •             String[] address = new String[10];



  •             Iterator ite = values.iterator();



  •             while (ite.hasNext()) {



  •                 String record = ite.next().toString();



  •                 int len = record.length();



  •                 int i = 2;



  •                 if (0 == len) {



  •                     continue;



  •                 }



  •                 // 取得左右表标识



  •                 char relationtype = record.charAt(0);



  •                 // 左表



  •                 if ('1' == relationtype) {



  •                     factory[factorynum] = record.substring(i);



  •                     factorynum++;



  •                 }



  •                 // 右表



  •                 if ('2' == relationtype) {



  •                     address[addressnum] = record.substring(i);



  •                     addressnum++;



  •                 }



  •             }



  •             // 求笛卡尔积



  •             if (0 != factorynum && 0 != addressnum) {



  •                 for (int m = 0; m < factorynum; m++) {



  •                     for (int n = 0; n < addressnum; n++) {



  •                         // 输出结果



  •                         context.write(new Text(factory[m]),



  •                                 new Text(address[n]));



  •                     }



  •                 }



  •             }



  •         }



  •     }







  •     public static void main(String[] args) throws Exception {



  •         Configuration conf = new Configuration();



  •         // 这句话很关键



  •         conf.set("mapred.jar", "mt.jar");



  •         String[] ioArgs = new String[] { "m_in", "m_out" };



  •         String[] otherArgs = new GenericOptionsParser(conf, ioArgs).getRemainingArgs();



  •         if (otherArgs.length != 2) {



  •             System.err.println("Usage: Multiple Table Join <in> <out>");



  •             System.exit(2);



  •         }



  •         Job job = new Job(conf, "Multiple Table Join");



  •         job.setJarByClass(MTjoin.class);







  •         // 设置Map和Reduce处理类



  •         job.setMapperClass(Map.class);



  •         job.setReducerClass(Reduce.class);







  •         // 设置输出类型



  •         job.setOutputKeyClass(Text.class);



  •         job.setOutputValueClass(Text.class);







  •         // 设置输入和输出目录



  •         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));



  •         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));



  •         System.exit(job.waitForCompletion(true) ? 0 : 1);



  •     }



  • }



4.执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac MTjoin.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/jar cvf xx.jar MTjoin*class
[hadoop@h71 q1]$ hadoop jar xx.jar MTjoin


5.查看结果:
[hadoop@h71 q1]$ hadoop fs -cat /user/hadoop/m_out/part-r-00000
name    address
hh      beijing
zs      beijing
qq      tianjing
zl      tianjing
ls      tianjing
ww      shanghai



【转载】 https://blog.csdn.net/m0_37739193/article/details/76572717


3 个回复

倒序浏览
奈斯
回复 使用道具 举报
回复 使用道具 举报
奈斯,加油加油
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马