|
Hadoop安装写在最前面 如果集群开放了所有的端口(或者说,防火墙处于关闭或者失效状态),一定要屏蔽8088端口的进流量(不是出流量) 配置按照/etc/profile文件里的目录路径配置hadoop文件的路径
修改yarn-site和mapreduce-site的参数
参考链接点这里 <configuration><property> <name>hadoop.temp.dir</name> <value>/root/hadoop/tmp</value> </property> <property> <name>fs.defaultFS</name> <value>hdfs://master:9000</value> </property><!-- eclipse连接hive 的配置--><property> <name>hadoop.proxyuser.root.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.root.groups</name> <value>*</value></property></configuration>- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 修改hadoop-env.sh,把相对路径改为绝对路径
export JAVA_HOME=/opt/java/jdk1.8<configuration> <property> <name>dfs:replication</name> <value>2</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>/root/hadoop/name</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>/root/hadoop/data</value> </property></configuration><configuration><property> <name>mapreduce.framework.name</name> <value>yarn</value></property></configuration><configuration><!-- Site specific YARN configuration properties --><property> <name>yarn.resourcemanager.hostname</name> <value>master</value> </property> <property> <name>yarn.resourcemanager.address</name> <value>${yarn.resourcemanager.hostname}:8032</value> </property> <property> <description>The address of the scheduler interface.</description> <name>yarn.resourcemanager.scheduler.address</name> <value>${yarn.resourcemanager.hostname}:8030</value> </property> <property> <description>The http address of the RM web application.</description> <name>yarn.resourcemanager.webapp.address</name> <value>${yarn.resourcemanager.hostname}:8088</value> </property> <property> <description>The https adddress of the RM web application.</description> <name>yarn.resourcemanager.webapp.https.address</name> <value>${yarn.resourcemanager.hostname}:8090</value> </property> <property> <name>yarn.resourcemanager.resource-tracker.address</name> <value>${yarn.resourcemanager.hostname}:8031</value> </property> <property> <description>The address of the RM admin interface.</description> <name>yarn.resourcemanager.admin.address</name> <value>${yarn.resourcemanager.hostname}:8033</value> </property> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> <property> <name>yarn.scheduler.maximum-allocation-mb</name> <value>4096</value> <discription>每个节点可用内存,单位MB,默认8182MB</discription> </property> <property> <name>yarn.nodemanager.vmem-pmem-ratio</name> <value>2.1</value> </property> <property> <name>yarn.nodemanager.resource.memory-mb</name> <value>2048</value></property> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value></property></configuration>- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
slave1slave2主从配置同步scp -r /opt/java root@slave1:/optscp -r /opt/java root@slave2:/optscp -r /opt/hadoop root@slave1:/optscp -r /opt/hadoop root@slave2:/optscp -r /etc/profile root@slave1:/etcscp -r /etc/profile root@slave2:/etcscp -r ~/.bashrc root@slave1:~/scp -r ~/.bashrc root@slave2:~/启动由于设置了aliases,依次执行hadoopfirststart,starthdfs,startyarn来启动集群,同时由于设置了ssh免密码登录,所以不需要再输入任何密码,只需要在必要的时候确认授权即可
不到万不得已,不要format hdfs
如果需要格式化hdfs,先删除/hadoop/tmp,var,name,dfs等文件夹
通过web端口http://134.175.xxx.xxx:50070来可视化检查。 使用hadoop安装后测试hadoop- 把本地的文件上传到hadoop的hdfs文件系统中,hadoop fs -put xxx hdfs://master:9000/(最后一个/不要省略,表示放在根目录下)
- 将本地文件删除,rm -rf xxx;然后从hdfs文件系统中恢复文件。hadoop fs -get hdfs://master:9000/xxx(文件的路径)
- 使用mapreduce跑hadoop自带的样例,例如计算圆周率。/share里面有个/hadoop/mapreduce,里面有个examples的jar包,使用hadoop jar(正常的java是使用java jar)hadoop jar hadoop-mapreduce-examples-2.8.4.jar pi 5 6(5和6是pi这个程序的参数)(5,6在腾讯云上跑不了,换成2,3,减少计算量)
- 跑一个字符统计的程序,首先需要把数据放到集群上去,可以使用hadoop命令创建目录,hadoop fs -mkdir xxx,创建好的目录可以在web端进行访问,hadoop jar xxx wordcount /inputdir /outputdir
HDFS机制简介(真简介,深入探究自行解决) - hdfs通过分布式集群存储文件,为客户端提供了一个web访问方式,提供了一个虚拟目录结构,例如在ip:50070上看到的目录实际上并不存在
- 文件被存储到集群时会被切分为block
- 文件的block存放在若干datanode上
- hdfs文件系统中的文件与真实的block之间的映射关系由namenode管理
- 可以配置每一个block的副本数量,提高并发能力和可靠性
- 50070端口查看namenode状态http://ip:50070
- 50075端口查看datanode状态
- 50090端口查看secondarynamenode
- 50030 端口,查看JobTracker
- 50060 端口,查看TaskTracker
HDFS的shell命令hadoop fs -helphadoop fs(故意写错让系统进行提示)hadoop fs -put PathOfLinux PathOfHDFShadoop fs -cat PathOfHDFShadoop fs -ls /hadoop fs -get PathOfHDFS PathOfLinuxhadoop fs -chmod xxx FilePathNameNode元数据管理机制简介NameNode 的作用是管理文件目录结构,是管理数据节点的。名字节点维护两套数据,一套是文件目录与数据块之间的关系,另一套是数据块与节点之间的关系。前一套数据是静态的,是存放在磁盘上的,通过fsimage 和edits 文件来维护;后一套数据是动态的,不持久化到磁盘的,每当集群启动的时候,会自动建立这些信息。 RPC机制Mapreduce框架问题引入
map+reduce机制
上图用于统计超大文本中的qq.com数量,每个map统计一部分,汇总到reduce中。 MR的执行流程- 客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar …)
- JobClient通过RPC和JobTracker进行通信,返回一个存放jar包的地址(HDFS)和jobId
- client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
- 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
- JobTracker进行初始化任务
- 读取HDFS上的要处理的文件,开始计算输入分片,每一个分片对应一个MapperTask
- TaskTracker通过心跳机制领取任务(任务的描述信息)
- 下载所需的jar,配置文件等
- TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask或ReducerTask)
- 将结果写入到HDFS当中
原理示意图:
从图中可以看出来,reduce是在map执行之后才执行的,reduce需要等待所有的map都计算出了结果才能进行reduce(汇总,不是减少的意思) 实现hadoop包中自带的测试样例wordcount![]()
1. 编写WCMapper package cn.colony.hadoop.mr.wordcount;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;//四个泛型前两个指定mapper输入数据的类型,KEYIN是输入的Key的类型,VALUEIN是输入的value的类型//map和reduce的数据输入输出都是用key-value对的形式进行的//默认情况下,框架传送给我们的mapper的输入数据中,key是要处理的文本中一行的起始偏移量,这一行的内容作为valuepublic class WCMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ //mapreduce框架每读一行数据就调用一次这个方法 @Override protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{ //具体的业务逻辑就写在这个函数中,函数所需要的数据已经由框架传递过来,在参数 key和value中 //key是要处理的文本中一行的起始偏移量,这一行的内容作为value //将这一行的内容转换成String类型,将文本按照特定的字符进行切分 String line = value.toString(); String[] words = StringUtils.split(line, " "); //遍历并发送给框架(数据从框架中来,送到框架中去) for (String word : words){ context.write(new Text(word), new LongWritable(1)); } }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
package cn.colony.hadoop.mr.wordcount;import java.io.IOException;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class WCReducer extends Reducer<Text, LongWritable, Text, LongWritable>{ //框架在map处理完成之后,将所有的key-value对缓存,然后传递一个组,调用一次reduce方法 //<hello,{1,1,1,.....,1}> @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException,InterruptedException{ long count = 0; for (LongWritable value:values){ count += value.get(); } //输出这一个单词的统计结果 context.write(key, new LongWritable(count)); }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
package cn.colony.hadoop.mr.wordcount;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class WCRunner { //用来描述一个特定的作业 //比如该作业使用哪个类作为逻辑处理中的map,哪些是reduce //指定该作业要处理的数据所在的路径 //指定该作业输出的结果放到哪里 public static void main(String[] args) throws Exception{ Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //设置整个job所使用的类在哪个jar包里 wcjob.setJarByClass(WCRunner.class); //本job使用的map和reduce的类 wcjob.setMapperClass(WCMapper.class); wcjob.setReducerClass(WCReducer.class); //reduce的输出 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(LongWritable.class); wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(LongWritable.class); //输入数据的路径 FileInputFormat.setInputPaths(wcjob, new Path("/wc/srcdata")); //输出数据的路径 FileOutputFormat.setOutputPath(wcjob, new Path("/wc/output")); //将job提交给集群 wcjob.waitForCompletion(true); }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
注意使用hadoop jar运行的时候必须指定主类的全路径 yarn框架YARN主体框架机制![]()
步骤说明:
1. hadoop接收到wc.jar包准备执行wc.jar的代码,集群首先启动RunJar进程,该进程向resource manager申请一个job
2. resource manager响应申请,返回信息给RunJar进程,该信息包含该job相关资源的提交路径和一个唯一的jobID
3. RunJar接收到resource manager的响应后向hdfs提交相关资源
4. 提交完毕之后,RunJar会向resource manager汇报结果
5. resource manager将job加入任务队列(集群同时可能有很多job在运行)
6. node manager 根据心跳机制和resource manager 通信,当发现有新任务时,向resource manager领取任务
7. yarn框架为领取了任务的node manager分配运行资源,例如CPU,内存资源
8. yarn让resource manager在分配的资源区域内启动mapreduceAppMaster(MRAppMaster)用于管理mapreduce
9. node manager在启动了MRAppMaster之后向resource manager注册
10. MRAppManager在各个领取了任务的node manager上运行map task(yarn child进程),
11. map task完成之后MRAppMaster启动reduce task
12. 整个job完成之后,MRAppMaster向resource manager提交相关信息并注销自己
![]() 实现流量统计样例这个样例的重点在于可以自定义一个数据结构,从而能够处理更复杂的信息 package cn.colony.hadoop.mr.flowsum;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;import org.apache.hadoop.io.Writable;public class FlowBean implements Writable { private String phoneNB; private long upFlow; private long downFlow; private long sumFlow; //在反序列化时,反射机制需要调用一个空参数的构造函数,所以需要显示定义 public FlowBean(){} //为了对象初始化方便,加入一个带参数的构造函数 public FlowBean(String phoneNB, long upFlow, long downFlow) { this.phoneNB = phoneNB; this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; } /** * 从数据流中反序列化出对象的数据 */ @Override public void readFields(DataInput in) throws IOException { //按照序列化时的顺序进行反序列化 phoneNB = in.readUTF(); upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } /** * 将对象数据序列化到流中, * jdk中自带的序列化机制带有冗余(相对于hadoop而言) * jdk中的序列化携带了类的继承信息,Hadoop值关心对象本身的信息,不携带类继承信息 */ @Override public void write(DataOutput out) throws IOException { out.writeUTF(phoneNB); out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public String toString() { return ""+upFlow+"\t"+downFlow+"\t"+sumFlow; } public String getPhoneNB() { return phoneNB; } public void setPhoneNB(String phoneNB) { this.phoneNB = phoneNB; } public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDownFlow() { return downFlow; } public void setDownFlow(long downFlow) { this.downFlow = downFlow; } public long getSumFlow() { return sumFlow; } public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
package cn.colony.hadoop.mr.flowsum;import java.io.IOException;import org.apache.commons.lang.StringUtils;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;/** * FlowBean是自定义的数据类型,要在hadoop之间各个节点时间传输,应该遵循hadoop序列化机制 * 就必须实现hadoop序列化的接口 * @author ColonyAlbert * */public class FlowSumMapper extends Mapper<LongWritable, Text, Text, FlowBean>{ //读取日志中的一行数据,切分各个字段,抽取出需要的字段:手机号、上行流量、下行流量。 //封装成k-v对发送出去 @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = StringUtils.split(line,"\t"); //根据具体的数据定义字段的index,拿到字段 String phoneNB = fields[1]; long upFlow = Long.parseLong(fields[7]); long downFlow = Long.parseLong(fields[8]); //封装成k-v进行传输 //reducer就会收到一个phoneNB,接着好多FlowBean这样的数据 context.write(new Text(phoneNB), new FlowBean(phoneNB,upFlow,downFlow)); }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
package cn.colony.hadoop.mr.flowsum;import java.io.IOException;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;public class FlowSumReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ //框架每传递一组数据<12334234,{FlowBean,FlowBean,FlowBean...}> //遍历values进行累加求和再输出 @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { long upFlowCounter = 0; long downFlowCounter = 0; for (FlowBean bean:values){ upFlowCounter += bean.getUpFlow(); downFlowCounter += bean.getDownFlow(); } context.write(key, new FlowBean(key.toString(), upFlowCounter, downFlowCounter)); }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
package cn.colony.hadoop.mr.flowsum;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.conf.Configured;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import org.apache.hadoop.util.Tool;import org.apache.hadoop.util.ToolRunner;public class FlowSumRunner extends Configured implements Tool{ @Override public int run(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(FlowSumRunner.class); job.setMapperClass(FlowSumMapper.class); job.setReducerClass(FlowSumReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); if (job.waitForCompletion(true) == true) return 0; else return 1; } public static void main(String[] args) throws Exception{ int result = ToolRunner.run(new Configuration(), new FlowSumRunner(), args); System.exit(result); }}- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
其余的步骤和wordcount没有区别,先上传jar包,再在hdfs集群里建立相关文件夹,使用hadoop jar运行jar包,再将结果get到本地查看。
【转载】。 https://blog.csdn.net/moquancsdn/article/details/81700133
|
|