A股上市公司传智教育(股票代码 003032)旗下技术交流社区北京昌平校区

Hadoop安装写在最前面

如果集群开放了所有的端口(或者说,防火墙处于关闭或者失效状态),一定要屏蔽8088端口的进流量(不是出流量)

配置

按照/etc/profile文件里的目录路径配置hadoop文件的路径
修改yarn-site和mapreduce-site的参数
参考链接点这里

  • 修改core-site.xml
<configuration><property>    <name>hadoop.temp.dir</name>    <value>/root/hadoop/tmp</value>  </property>  <property>    <name>fs.defaultFS</name>    <value>hdfs://master:9000</value>  </property><!-- eclipse连接hive 的配置--><property>  <name>hadoop.proxyuser.root.hosts</name>  <value>*</value> </property> <property>  <name>hadoop.proxyuser.root.groups</name>  <value>*</value></property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 修改hadoop-env.sh,把相对路径改为绝对路径
export JAVA_HOME=/opt/java/jdk1.8
  • 1
  • 修改hdfs-site.xml
<configuration> <property>    <name>dfs:replication</name>    <value>2</value>  </property>  <property>    <name>dfs.namenode.name.dir</name>    <value>/root/hadoop/name</value>  </property>  <property>    <name>dfs.datanode.data.dir</name>    <value>/root/hadoop/data</value>  </property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 修改mapred-site.xml
<configuration><property>       <name>mapreduce.framework.name</name>       <value>yarn</value></property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 修改yarn-site.xml
<configuration><!-- Site specific YARN configuration properties --><property>        <name>yarn.resourcemanager.hostname</name>        <value>master</value>   </property>   <property>        <name>yarn.resourcemanager.address</name>        <value>${yarn.resourcemanager.hostname}:8032</value>   </property>   <property>        <description>The address of the scheduler interface.</description>        <name>yarn.resourcemanager.scheduler.address</name>        <value>${yarn.resourcemanager.hostname}:8030</value>   </property>   <property>        <description>The http address of the RM web application.</description>        <name>yarn.resourcemanager.webapp.address</name>        <value>${yarn.resourcemanager.hostname}:8088</value>   </property>   <property>        <description>The https adddress of the RM web application.</description>        <name>yarn.resourcemanager.webapp.https.address</name>        <value>${yarn.resourcemanager.hostname}:8090</value>   </property>   <property>        <name>yarn.resourcemanager.resource-tracker.address</name>        <value>${yarn.resourcemanager.hostname}:8031</value>   </property>   <property>        <description>The address of the RM admin interface.</description>        <name>yarn.resourcemanager.admin.address</name>        <value>${yarn.resourcemanager.hostname}:8033</value>   </property>   <property>        <name>yarn.nodemanager.aux-services</name>        <value>mapreduce_shuffle</value>   </property>   <property>        <name>yarn.scheduler.maximum-allocation-mb</name>        <value>4096</value>        <discription>每个节点可用内存,单位MB,默认8182MB</discription>   </property>   <property>        <name>yarn.nodemanager.vmem-pmem-ratio</name>        <value>2.1</value>   </property>   <property>        <name>yarn.nodemanager.resource.memory-mb</name>        <value>2048</value></property>   <property>        <name>yarn.nodemanager.vmem-check-enabled</name>        <value>false</value></property></configuration>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 修改slaves
    vim slaves
slave1slave2
  • 1
  • 2
主从配置同步scp -r /opt/java root@slave1:/optscp -r /opt/java root@slave2:/optscp -r /opt/hadoop root@slave1:/optscp -r /opt/hadoop root@slave2:/optscp -r /etc/profile root@slave1:/etcscp -r /etc/profile root@slave2:/etcscp -r ~/.bashrc root@slave1:~/scp -r ~/.bashrc root@slave2:~/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
启动

由于设置了aliases,依次执行hadoopfirststart,starthdfs,startyarn来启动集群,同时由于设置了ssh免密码登录,所以不需要再输入任何密码,只需要在必要的时候确认授权即可
不到万不得已,不要format hdfs
如果需要格式化hdfs,先删除/hadoop/tmp,var,name,dfs等文件夹
通过web端口http://134.175.xxx.xxx:50070来可视化检查。

使用hadoop安装后测试hadoop
  • 把本地的文件上传到hadoop的hdfs文件系统中,hadoop fs -put xxx hdfs://master:9000/(最后一个/不要省略,表示放在根目录下)
  • 将本地文件删除,rm -rf xxx;然后从hdfs文件系统中恢复文件。hadoop fs -get hdfs://master:9000/xxx(文件的路径)
  • 使用mapreduce跑hadoop自带的样例,例如计算圆周率。/share里面有个/hadoop/mapreduce,里面有个examples的jar包,使用hadoop jar(正常的java是使用java jar)hadoop jar hadoop-mapreduce-examples-2.8.4.jar pi 5 6(5和6是pi这个程序的参数)(5,6在腾讯云上跑不了,换成2,3,减少计算量)
  • 跑一个字符统计的程序,首先需要把数据放到集群上去,可以使用hadoop命令创建目录,hadoop fs -mkdir xxx,创建好的目录可以在web端进行访问,hadoop jar xxx wordcount /inputdir /outputdir
HDFS机制简介

(真简介,深入探究自行解决)

  • hdfs通过分布式集群存储文件,为客户端提供了一个web访问方式,提供了一个虚拟目录结构,例如在ip:50070上看到的目录实际上并不存在
  • 文件被存储到集群时会被切分为block
  • 文件的block存放在若干datanode上
  • hdfs文件系统中的文件与真实的block之间的映射关系由namenode管理
  • 可以配置每一个block的副本数量,提高并发能力和可靠性
  • 50070端口查看namenode状态http://ip:50070
  • 50075端口查看datanode状态
  • 50090端口查看secondarynamenode
  • 50030 端口,查看JobTracker
  • 50060 端口,查看TaskTracker
HDFS的shell命令
  • 查看帮助
hadoop fs -help
  • 1
  • 文件操作帮助
hadoop fs(故意写错让系统进行提示)
  • 1
  • 上传
hadoop fs -put PathOfLinux PathOfHDFS
  • 1
  • 查看文件内容
hadoop fs -cat PathOfHDFS
  • 1
  • 查看文件列表
hadoop fs -ls /
  • 1
  • 下载文件
hadoop fs -get PathOfHDFS PathOfLinux
  • 1
  • 修改文件权限
hadoop fs -chmod xxx FilePath
  • 1
NameNode元数据管理机制简介

NameNode 的作用是管理文件目录结构,是管理数据节点的。名字节点维护两套数据,一套是文件目录与数据块之间的关系,另一套是数据块与节点之间的关系。前一套数据是静态的,是存放在磁盘上的,通过fsimage 和edits 文件来维护;后一套数据是动态的,不持久化到磁盘的,每当集群启动的时候,会自动建立这些信息。

RPC机制
  • [ ] 挖坑,日后填
Mapreduce框架问题引入


map+reduce机制

上图用于统计超大文本中的qq.com数量,每个map统计一部分,汇总到reduce中。

MR的执行流程
  • 客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar …)
  • JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
  • client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
  • 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
  • JobTracker进行初始化任务
  • 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
  • TaskTracker通过心跳机制领取任务(任务的描述信息)
  • 下载所需的jar,配置文件等
  • TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
  • 将结果写入到HDFS当中

原理示意图:

从图中可以看出来,reduce是在map执行之后才执行的,reduce需要等待所有的map都计算出了结果才能进行reduce(汇总,不是减少的意思)

实现hadoop包中自带的测试样例wordcount


1. 编写WCMapper

package cn.colony.hadoop.mr.wordcount;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;//四个泛型前两个指定mapper输入数据的类型,KEYIN是输入的Key的类型,VALUEIN是输入的value的类型//map和reduce的数据输入输出都是用key-value对的形式进行的//默认情况下,框架传送给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为valuepublic class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{    //mapreduce框架每读一行数据就调用一次这个方法    @Override    protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{        //具体的业务逻辑就写在这个函数中,函数所需要的数据已经由框架传递过来,在参数 key和value中        //key是要处理的文本中一行的起始偏移量,这一行的内容作为value        //将这一行的内容转换成String类型,将文本按照特定的字符进行切分        String line = value.toString();        String[] words = StringUtils.split(line, " ");        //遍历并发送给框架(数据从框架中来,送到框架中去)        for (String word : words){            context.write(new Text(word), new LongWritable(1));        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 编写WCReducer
package cn.colony.hadoop.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{    //框架在map处理完成之后,将所有的key-value对缓存,然后传递一个组,调用一次reduce方法    //<hello,{1,1,1,.....,1}>    @Override    protected void reduce(Text key, Iterable<LongWritable> values, Context context)            throws IOException,InterruptedException{        long count = 0;        for (LongWritable value:values){            count += value.get();        }        //输出这一个单词的统计结果        context.write(key, new LongWritable(count));    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 编写WCRunner
package cn.colony.hadoop.mr.wordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WCRunner {    //用来描述一个特定的作业    //比如该作业使用哪个类作为逻辑处理中的map,哪些是reduce    //指定该作业要处理的数据所在的路径    //指定该作业输出的结果放到哪里    public static void main(String[] args) throws Exception{        Configuration conf = new Configuration();        Job wcjob = Job.getInstance(conf);        //设置整个job所使用的类在哪个jar包里        wcjob.setJarByClass(WCRunner.class);        //本job使用的map和reduce的类        wcjob.setMapperClass(WCMapper.class);        wcjob.setReducerClass(WCReducer.class);        //reduce的输出        wcjob.setOutputKeyClass(Text.class);        wcjob.setOutputValueClass(LongWritable.class);        wcjob.setMapOutputKeyClass(Text.class);        wcjob.setMapOutputValueClass(LongWritable.class);        //输入数据的路径        FileInputFormat.setInputPaths(wcjob, new Path("/wc/srcdata"));        //输出数据的路径        FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output"));        //将job提交给集群        wcjob.waitForCompletion(true);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

注意使用hadoop jar运行的时候必须指定主类的全路径

yarn框架YARN主体框架机制


步骤说明:
1. hadoop接收到wc.jar包准备执行wc.jar的代码,集群首先启动RunJar进程,该进程向resource manager申请一个job
2. resource manager响应申请,返回信息给RunJar进程,该信息包含该job相关资源的提交路径和一个唯一的jobID
3. RunJar接收到resource manager的响应后向hdfs提交相关资源
4. 提交完毕之后,RunJar会向resource manager汇报结果
5. resource manager将job加入任务队列(集群同时可能有很多job在运行)
6. node manager 根据心跳机制和resource manager 通信,当发现有新任务时,向resource manager领取任务
7. yarn框架为领取了任务的node manager分配运行资源,例如CPU,内存资源
8. yarn让resource manager在分配的资源区域内启动mapreduceAppMaster(MRAppMaster)用于管理mapreduce
9. node manager在启动了MRAppMaster之后向resource manager注册
10. MRAppManager在各个领取了任务的node manager上运行map task(yarn child进程),
11. map task完成之后MRAppMaster启动reduce task
12. 整个job完成之后,MRAppMaster向resource manager提交相关信息并注销自己

实现流量统计样例

这个样例的重点在于可以自定义一个数据结构,从而能够处理更复杂的信息

  • 自定义数据类型FlowBean.class
package cn.colony.hadoop.mr.flowsum;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable {    private String phoneNB;    private long upFlow;    private long downFlow;    private long sumFlow;    //在反序列化时,反射机制需要调用一个空参数的构造函数,所以需要显示定义    public FlowBean(){}    //为了对象初始化方便,加入一个带参数的构造函数    public FlowBean(String phoneNB, long upFlow, long downFlow) {        this.phoneNB = phoneNB;        this.upFlow = upFlow;        this.downFlow = downFlow;        this.sumFlow = upFlow + downFlow;    }    /**     * 从数据流中反序列化出对象的数据     */    @Override    public void readFields(DataInput in) throws IOException {        //按照序列化时的顺序进行反序列化        phoneNB = in.readUTF();        upFlow = in.readLong();        downFlow = in.readLong();        sumFlow = in.readLong();    }    /**     * 将对象数据序列化到流中,     * jdk中自带的序列化机制带有冗余(相对于hadoop而言)     * jdk中的序列化携带了类的继承信息,Hadoop值关心对象本身的信息,不携带类继承信息     */    @Override    public void write(DataOutput out) throws IOException {        out.writeUTF(phoneNB);        out.writeLong(upFlow);        out.writeLong(downFlow);        out.writeLong(sumFlow);    }    @Override    public String toString() {        return ""+upFlow+"\t"+downFlow+"\t"+sumFlow;    }    public String getPhoneNB() {        return phoneNB;    }    public void setPhoneNB(String phoneNB) {        this.phoneNB = phoneNB;    }    public long getUpFlow() {        return upFlow;    }    public void setUpFlow(long upFlow) {        this.upFlow = upFlow;    }    public long getDownFlow() {        return downFlow;    }    public void setDownFlow(long downFlow) {        this.downFlow = downFlow;    }    public long getSumFlow() {        return sumFlow;    }    public void setSumFlow(long sumFlow) {        this.sumFlow = sumFlow;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 实现mapper
package cn.colony.hadoop.mr.flowsum;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * FlowBean是自定义的数据类型,要在hadoop之间各个节点时间传输,应该遵循hadoop序列化机制 * 就必须实现hadoop序列化的接口 * @author ColonyAlbert * */public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{    //读取日志中的一行数据,切分各个字段,抽取出需要的字段:手机号、上行流量、下行流量。    //封装成k-v对发送出去    @Override    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)            throws IOException, InterruptedException {        String line = value.toString();        String[] fields = StringUtils.split(line,"\t");        //根据具体的数据定义字段的index,拿到字段        String phoneNB = fields[1];        long upFlow = Long.parseLong(fields[7]);        long downFlow = Long.parseLong(fields[8]);        //封装成k-v进行传输        //reducer就会收到一个phoneNB,接着好多FlowBean这样的数据        context.write(new Text(phoneNB), new FlowBean(phoneNB,upFlow,downFlow));    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 实现reducer
package cn.colony.hadoop.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{    //框架每传递一组数据<12334234,{FlowBean,FlowBean,FlowBean...}>    //遍历values进行累加求和再输出    @Override    protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context)            throws IOException, InterruptedException {        long upFlowCounter = 0;        long downFlowCounter = 0;        for (FlowBean bean:values){            upFlowCounter += bean.getUpFlow();            downFlowCounter += bean.getDownFlow();        }        context.write(key, new FlowBean(key.toString(), upFlowCounter, downFlowCounter));    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 实现runner
package cn.colony.hadoop.mr.flowsum;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FlowSumRunner extends Configured implements Tool{    @Override    public int run(String[] args) throws Exception {        Configuration conf = new Configuration();        Job job = Job.getInstance(conf);        job.setJarByClass(FlowSumRunner.class);        job.setMapperClass(FlowSumMapper.class);        job.setReducerClass(FlowSumReducer.class);        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(FlowBean.class);        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(FlowBean.class);        FileInputFormat.setInputPaths(job, new Path(args[0]));        FileOutputFormat.setOutputPath(job, new Path(args[1]));        if (job.waitForCompletion(true) == true) return 0;        else return 1;    }    public static void main(String[] args) throws Exception{        int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args);        System.exit(result);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

其余的步骤和wordcount没有区别,先上传jar包,再在hdfs集群里建立相关文件夹,使用hadoop jar运行jar包,再将结果get到本地查看。



【转载】。        https://blog.csdn.net/moquancsdn/article/details/81700133


2 个回复

正序浏览
奈斯
回复 使用道具 举报
您需要登录后才可以回帖 登录 | 加入黑马