A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

根据上一篇学习的MapReduce整体工作机制,然后练习一下写个WorldCount,

惯例写代码之前还是先梳理一下逻辑:

1、先写一个提供给map task用的类,

2、reduce task需要一个类来处理key相同的数据用,

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
       
        @Override
        protected void map(LongWritable key, Text value, Context context)
                        throws IOException, InterruptedException {

                // 切单词
                String line = value.toString();
                String[] words = line.split(" ");
                for(String word:words){
                        context.write(new Text(word), new IntWritable(1));
                }
        }
}
然后解析一下这个父类Mapper的参数:

* KEYIN :是map task读取到的数据的key的类型,是一行的起始偏移量Long
* VALUEIN:是map task读取到的数据的value的类型,是一行的内容String
*
* KEYOUT:是用户的自定义map方法要返回的结果kv数据的key的类型,在wordcount逻辑中,我们需要返回的是单词String
* VALUEOUT:是用户的自定义map方法要返回的结果kv数据的value的类型,在wordcount逻辑中,我们需要返回的是整数Integer
*
* 但是,在mapreduce中,map产生的数据需要传输给reduce,需要进行序列化和反序列化,而jdk中的原生序列化机制产生的数据量比较冗余,就会导致数据在mapreduce运行过程中传输效率低下
* 所以,hadoop专门设计了自己的序列化机制,那么,mapreduce中传输的数据类型就必须实现hadoop自己的序列化接口
*
* hadoop为jdk中的常用基本类型Long String Integer Float等数据类型封住了自己的实现了hadoop序列化接口的类型:LongWritable,Text,IntWritable,FloatWritable(自己对应吧)
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
       
       
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
                int count = 0;
               
                Iterator<IntWritable> iterator = values.iterator();
                while(iterator.hasNext()){
                       
                        IntWritable value = iterator.next();
                        count += value.get();
                }
                context.write(key, new IntWritable(count));
        }
}
在运行这些程序之前先搭一个yarn集群,之所以要搭这个集群我这里解释一下:

mapreduce程序应该是在很多机器上并行启动,而且先执行map task,当众多的maptask都处理完自己的数据后,还需要启动众多的reduce task,这个过程如果用用户自己手动调度不太现实,需要一个自动化的调度平台——hadoop中就为运行mapreduce之类的分布式运算程序开发了一个自动化调度平台——YARN

首先,为你的mapreduce程序开发一个提交job到yarn的客户端类(模板代码):

描述你的mapreduce程序运行时所需要的一些信息(比如用哪个mapper、reducer、map和reduce输出的kv类型、jar包所在路径、reduce task的数量、输入输出数据的路径)
将信息和整个工程的jar包一起交给yarn
然后,将整个工程(yarn客户端类+ mapreduce所有jar和自定义类)打成jar包

然后,将jar包上传到hadoop集群中的任意一台机器上

最后,运行jar包中的(YARN客户端类)

安装yarn集群

yarn集群中有两个角色:

主节点:Resource Manager  1台

从节点:Node Manager   N台

Resource Manager一般安装在一台专门的机器上

Node Manager应该与HDFS中的data node重叠在一起

yarn的安装包已经有了,在hadoop包里就有,现在只需要配置:

yarn-site.xml

<property>

<name>yarn.resourcemanager.hostname</name>

<value>hdp-04</value>

</property>

<property>

<name>yarn.nodemanager.aux-services</name>

<value>mapreduce_shuffle</value>

</property>

然后复制到每一台机器上

然后在hdp-04上,修改hadoop的slaves文件,列入要启动nodemanager的机器

然后将hdp-04到所有机器的免密登陆配置好

然后,就可以用脚本启动yarn集群:

不用脚本每起一台:{yarn-daemon.sh start resourcemanager}

查看剩余内存:free -m

sbin/start-yarn.sh

停止:

sbin/stop-yarn.sh

启动完成后,可以在windows上用浏览器访问resourcemanager的web端口:

http://hdp-04:8088

看resource mananger是否认出了所有的node manager节点



配置好yarn-site.xml

<property>

<name>yarn.nodemanager.resource.memory-mb</name>

<value>2048</value>               --------->最好在2G以上,因为在maptask和reducetask程序启动之前会启动一个主管程序(yarn app mapreduce am resource mb)

</property>

<property>

<name>yarn.nodemanager.resource.cpu-vcores</name>

<value>2</value>

</property>

以后再看:



yarn集群部署完就可以把MarReduce程序拿来运行了,先启动job客户端,job客户端会把MapReduce程序jar包发给yarn,job客户端会和yarn交互,写代码:

* 用于提交mapreduce job的客户端程序
* 功能:
*   1、封装本次job运行时所需要的必要参数
*   2、跟yarn进行交互,将mapreduce程序成功的启动、运行
public static void main(String[] args) throws Exception {
               
                // 在代码中设置JVM系统参数,用于给job对象来获取访问HDFS的用户身份
                System.setProperty("HADOOP_USER_NAME", "root");
               
               
                Configuration conf = new Configuration();
                // 1、设置job运行时要访问的默认文件系统
                conf.set("fs.defaultFS", "hdfs://hdp-01:9000");
                // 2、设置job提交到哪去运行
                conf.set("mapreduce.framework.name", "yarn");
                conf.set("yarn.resourcemanager.hostname", "hdp-01");
                // 3、如果要从windows系统上运行这个job提交客户端程序,则需要加这个跨平台提交的参数
                conf.set("mapreduce.app-submission.cross-platform","true");
               
                Job job = Job.getInstance(conf);
               
                // 1、封装参数:jar包所在的位置
                job.setJar("d:/wc.jar");
                //job.setJarByClass(JobSubmitter.class);
               
                // 2、封装参数: 本次job所要调用的Mapper实现类、Reducer实现类
                job.setMapperClass(WordcountMapper.class);
                job.setReducerClass(WordcountReducer.class);
               
                // 3、封装参数:本次job的Mapper实现类、Reducer实现类产生的结果数据的key、value类型
                job.setMapOutputKeyClass(Text.class);
                job.setMapOutputValueClass(IntWritable.class);
               
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
               
               
               
                Path output = new Path("/wordcount/output");
                FileSystem fs = FileSystem.get(new URI("hdfs://hdp-01:9000"),conf,"root");
                if(fs.exists(output)){
                        fs.delete(output, true);
                }
               
                // 4、封装参数:本次job要处理的输入数据集所在路径、最终结果的输出路径
                FileInputFormat.setInputPaths(job, new Path("/wordcount/input"));
                FileOutputFormat.setOutputPath(job, output);  // 注意:输出路径必须不存在
               
               
                // 5、封装参数:想要启动的reduce task的数量
                job.setNumReduceTasks(2);
               
                // 6、提交job给yarn
                boolean res = job.waitForCompletion(true);
               
                System.exit(res?0:-1);
               
        }

---------------------
【转载,仅作分享,侵删】
作者:高辉  
原文:https://blog.csdn.net/ZJX103RLF/article/details/88966865
版权声明:本文为博主原创文章,转载请附上博文链接!

1 个回复

倒序浏览
奈斯,感谢分享!
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马