1.Hdfs写入 Hdfs写入的体系结构
代码: @Test
public void putFile() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.200.10:8020/");
FileSystem fs = FileSystem.get(conf) ;
FSDataOutputStream out = fs.create(new Path("/user/zpx/data/a.txt"));
out.write("helloworld".getBytes());
out.close();
}
从Hdfs写入的结构中我们可以看到
第一,Hdfs在进行写入的时候,客户端通过调用DistributedFileSytem中的create()函数对象创建一个文件。DistributedFileSystem是抽象类FileSystem的一个实例。DistributedFileSystem通过RPC调用在NameNode的文件系统命名空间中创建一个新文件(创建新文件的格式在edits中,下面分析),此时还没有相关的DataNode与之关联。 第二,NameNode会通过多种验证保证新的文件不存在文件系统中,并且确保请求客户端拥有创建文件的权限(如果没有,通过 hdfs dfs -chmod o+w /user/zpx 来修改)。当所有的验证同通过时,NameNode会创建一个新文件的记录,如果创建失败,则抛出一个IOException异常,如果成功,则DistributedFileSytem通过掉用create()函数返回一个HdfsDataOutputStream输出流来供客户端用来写入数据。 第三,在调用write()函数写入数据的时候,内部通过FSDataOutputStream(继承自DataOutputStream)的装饰设计模式封装了DFSOutputStream来处理DataNode与NameNode之间的通信,并且把数据写入到本地缓冲区,如果缓冲区已经满了,则掉调用flushBuffer()函数,否则存储在本地缓冲区。 第四,将数据写入到缓冲区之后,客户端调用close()函数,将缓冲区的数据通过DFSOutputStream调用flushBuffer()函数将数据拆分成包packet的形式,如下图:
前33个字节为packet的校验字节,因为每个包包括126个chunck,所以会生成126*4个chunck的校验,然后后面的字节用来存放文件的数据。最后会在末尾添加一个空的包,用来表示此数据包的结束。然后将每一个包都放入到一个内部队列,称为“数据队列”。 第五,DataStreamer会将这些小的文件包放入到数据流中,DataStreamer的作用是请求NameNode为新的文件包分配合适的DataNode存放数据副本(根据上一篇说的在配置文件hdfs-site.xml中获得配置的副本数)。返回的的DataNode列表形成一个“管道”,假设这里的副本数是3,那么这个管道中就会有3个DataNode。DataStreamer将文件包以流的方式传送给队列中的第一个DataNode。第一个DataNode会存储这个包,然后将他推送到第二个DataNode中,随后照这样进行,直到管道中的最后一个DataNode。 第六,在DFSOutputStream同时会保存一个包的内部队列,用来等待管道中的DataNode返回确认信息,这个队列被称作“确认队列”。只有当所有管道中的DataNode都返回了写入成功返回信息文件包之后,才会从确认队列中删除。当然,如果Hdfs也会考虑写入失败的情况,当数据写入失败的时候,Hdfs会做出如下反应:首先会关闭管道,任何在确认队列中的文件包都会被添加到数据队列的前端,这样就能保证管道中失败的DataNode都不会丢失数据。当前存放于正常的工作DataNode之上的文件块会被赋予一个新的身份,并且和NameNode进行关联。这样,如果失败的DataNode过段时间从故障中恢复出来,其中的部分数据块就会从这个发生故障的数据节点DataNode中删除,然后管道会删掉这个失败的DataNode,文件会继续被写到管道中的另外的DataNode中。最后NameNode会注意到现在的文件块副本没有达到配置属性要求,会在另外正常工作的DataNode上重新安排创建副本,直到达到满足配置要求的副本数。随后文件会正常执行写入操作。
2.Hdfs读取 @Test
public void readFileByAPI2() throws Exception{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.200.10:8020/");
FileSystem fs = FileSystem.get(conf) ;
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Path p = new Path("/user/zpx/hadoop/xcall.sh");
FSDataInputStream fis = fs.open(p);
IOUtils.copyBytes(fis, baos, 1024);
System.out.println(new String(baos.toByteArray()));
}
第一,客户端通过调用客户端通过调用DistributedFileSytem中的open()函数来读取它所需要的数据。DistributedFileSystem是抽象类FileSystem的一个实例。DistributedFileSystem通过RPC调用在NameNode来确定请求文件块所在的位置。需要注意,NameNode只会返回所调用文件中开始的几个块,而不是全部返回,对于每个返回的块,都包含块所在的DataNode地址。随后,这些返回的DataNode会按照Hadoop定义的集群拓扑结构来得出客户端的距离,然后再进行排序。如果客户端本身就是存储数据的节点,那么直接读取数据就好。 第二,DistributedFileSytem会返回个客户端一个支持文件定位的FSDataInputStream,用于客户端来读取数据,其中包含了一个DFSInputStream对象,用来管理DataNode和NameNode之间的I/O。客户端在这个输入流上调用read()函数。DFSInputStream对象中包含文件开始部分数据块所在的DataNode地址,然后连接包含文件第一个数据块最近的节点DataNode。反复在数据流中使用read()函数来读取数据,直到这个数据块的数据全部被读完。当当前节点的数据块被读取完成之后,DFSInputStream会关闭连接,调用NamenNode来查找下一个数据块距离客户端最近的DataNode,依次步骤下去,直到读取到最后一个数据块。当完成所有的数据 第三,当数据块全部读取完毕之后,客户端会在FSDataInputStream中调用close()函数 第四,如果客户端和所连接的DataNode读取时出现故障,那么会尝试连接存储当前数据块的下一个离客户端最近的DataNode节点,同时记录当前出现故障的节点,下次不再连接读取。在读取DataNode的数据块的时候,客户端还会验证从DataNode传送过来的数据校验和,如果发现数据块损坏,那么客户端将尝试从别的DataNode读取数据块,并且向NameNodr报告这个消息,NameNode也会更新保存的文件信息。
3.编辑日志(edits)与镜像文件(fsimage) 我们根据前面的配置文件core-site.xml来找到hadoop的NameNode数据目录,然后找到current目录
我们发现当下目录下包括VERSION,edits,fsimage 1.VERSION
记录了当前集群的版本号,集群中的所有DataNode的版本号与其一致,不然集群启动不成功,需要格式化到同一个版本下,查看一下DataNode存储数据的文件夹下就可以看到,当前的版本号与NameNode一致,集群才能正常运行
其中storageID是用于在NameNode处标识DataNode。storageType将这个目录标志为DataNode数据存储目录 ,VERSION上边的文件夹是用来存储在数据块
2.edits
edits包括两类,edits_XXX,edits_inprogress_XXX 2.1edits_XXX:保存文件系统的操作 使用命令hdfs oiv -i edits_xxx -o b.xml -p XML来查询内容 <?xml version="1.0" encoding="UTF-8"?>
-<EDITS>
<EDITS_VERSION>-63</EDITS_VERSION>
-<RECORD>
<OPCODE>OP_START_LOG_SEGMENT</OPCODE>
-<DATA>
<TXID>7170</TXID>
</DATA>
</RECORD>
-<RECORD>
<OPCODE>OP_ADD</OPCODE>
-<DATA>
<TXID>7171</TXID>
<LENGTH>0</LENGTH>
<INODEID>17787</INODEID>
<PATH>/user/zpx/a.txt</PATH>
<REPLICATION>3</REPLICATION>
<MTIME>1489118864779</MTIME>
<ATIME>1489118864779</ATIME>
<BLOCKSIZE>数据的大小</BLOCKSIZE>
<CLIENT_NAME>DFSClient_NONMAPREDUCE_1295720148_1</CLIENT_NAME>
<CLIENT_MACHINE>192.168.231.1</CLIENT_MACHINE>
<OVERWRITE>true</OVERWRITE>
-<PERMISSION_STATUS>
<USERNAME>Administrator</USERNAME>
<GROUPNAME>supergroup</GROUPNAME>
<MODE>420</MODE>
</PERMISSION_STATUS>
<RPC_CLIENTID>0c9a5af9-26a8-45d9-8754-cd0e7e47f65b</RPC_CLIENTID>
<RPC_CALLID>0</RPC_CALLID>
</DATA>
</RECORD>
</EDITS>
对Hdfs文件系统的每一个操作都保存在了edits文件中,每一个操作都是事务,有事务id——<TXID>7171</TXID>,还有当前操作做了什么<OPCODE>OP_ADD</OPCODE>,副本数,以及大小 2.2edits_inprogress_XXX:正在使用的过程,当前正在向前滚动。查看方式与上边一样 可以在web界面中查看过程的加载:
3.fsimage 通过上图的web界面我们可以看到,在启动Hdfs的时候,首先加载的是fsimage,然后再加载edits,那么fsimage_xxx是什么文件呢,我们使用命令hdfs oiv -i fsimage_XXX -o a.xml -p XML查看
<?xml version="1.0"?>
-<fsimage>
-<NameSection> <genstampV1>1000</genstampV1> <genstampV2>2046</genstampV2>
<genstampV1Limit>0</genstampV1Limit>
<lastAllocatedBlockId>1073742865</lastAllocatedBlockId>
<txid>7169</txid>
</NameSection>
-<INodeSection>
<lastInodeId>17786</lastInodeId>
-<inode>
<id>16385</id>
<type>DIRECTORY</type>
<name/>
<mtime>1489116410702</mtime>
<permission>zpx:supergroup:rwxr-xr-x</permission>
<nsquota>9223372036854775807</nsquota>
<dsquota>-1</dsquota>
</inode>
-<inode>
<id>17782</id>
<type>DIRECTORY</type>
<name>user</name>
<mtime>1489116410702</mtime>
<permission>zpx:supergroup:rwxr-xr-x</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
-<inode>
<id>17783</id>
<type>DIRECTORY</type>
<name>zpx</name>
<mtime>1489117825394</mtime>
<permission>zpx:supergroup:rwxr-xrwx</permission>
<nsquota>-1</nsquota>
<dsquota>-1</dsquota>
</inode>
-<inode>
<id>17784</id>
<type>FILE</type>
<name>hello.txt</name>
<replication>3</replication>
<mtime>1489116471466</mtime>
<atime>1489116470750</atime>
<perferredBlockSize>134217728</perferredBlockSize>
<permission>zpx:supergroup:rw-r--r--</permission>
-<blocks>
-<block>
<id>1073742862</id>
<genstamp>2043</genstamp>
<numBytes>12</numBytes>
</block>
</blocks>
</inode>
<inode>
<inode>
<id>17786</id>
<type>FILE</type>
<name>a.txt</name>
<replication>3</replication>
<mtime>1489117825708</mtime>
<atime>1489117825394</atime>
<perferredBlockSize>134217728</perferredBlockSize>
<permission>Administrator:supergroup:rw-r--r--</permission>
-<blocks>
-<block>
<id>1073742865</id>
<genstamp>2046</genstamp>
<numBytes>12</numBytes>
</block>
</blocks>
</inode>
</INodeSection>
<INodeReferenceSection/>
-<SnapshotSection>
<snapshotCounter>0</snapshotCounter>
</SnapshotSection>
-<INodeDirectorySection>
-<directory>
<parent>16385</parent>
<inode>17782</inode>
</directory>
-<directory>
<parent>17782</parent>
<inode>17783</inode>
</directory>
-<directory>
<parent>17783</parent>
<inode>17786</inode>
<inode>17785</inode> <inode>17784</inode> </directory>
</INodeDirectorySection>
<FileUnderConstructionSection/> -<SnapshotDiffSection> -<diff>
<inodeid>16385</inodeid>
</diff>
</SnapshotDiffSection> -<SecretManagerSection> <currentId>0</currentId>
<tokenSequenceNumber>0</tokenSequenceNumber>
</SecretManagerSection>
-<CacheManagerSection>
<nextDirectiveId>1</nextDirectiveId>
</CacheManagerSection>
</fsimage>
我们可以看到在fsimage_xxx文件中,保存的是Hdfs文件系统的每个文件的基本信息,每个文件对应着一个<inode>节点,包括文件的一些基本信息,例如:文件名称,类型,块大小,副本数,id,文件之间的关系通过<parent>来进行管理。但是并没有发现文件存储的DataNode节点信息,为什么会这样呢?这是因为在Hdfs在启动的时候,DataNode会与NameNode来进行映射,告诉NameNode基本信息,进行动态的映射。这样NameNode只需要保存文件的基本信息就可以。 正如上面所分析的,Hadoop文件系统会出现编辑日志(edits)不断增长的情况,尽管在NameNode运行期间不会对文件系统造成影响,但是如果NameNode重新启动,它将会花费大量的时间运行编辑日志中的每个操作,在此期间也就是我们前面所说的安全模式下,文件系统是不可用的。 为了解决上述问题,Hadoop会运行一个Secondary NameNode进程,它的任务就是为原NameNode内存中的文件系统元数据产生检查点。其实说白了,就是辅助NameNode来处理fsimage文件与edits文件的一个进程。它从NameNode中复制fsimage与edits到临时目录并定期合并成一个新的fsimage并且删除原来的编辑日志edits。具体 步骤如下: (1)Secondary NameNode首先请求原NameNode进行edits的滚动,这样会产生一个新的编辑日志文件edits来保存对文件系统的操作(例如:上传新文件,删除文件,修改文件)。 (2)Secondary NameNode通过Http方式读取原NameNode中的fsimage及edits。 (3)Secondary NameNode将fsimage及edits进行合并产生新的fsimage (4)Secondary NameNode通过Http方式将新生成的fsimage发送到原来的NameNode中 (5)原NameNode用新生成的fsimage替换掉旧的fsimage文件,新生成的edits文件也就是(1)生成的滚动编辑日志文件替换掉之前的edits文件 图示如下:
通常来说,检查点由两个参数决定,Secondary NameNode每小时插入一个检查点(fs.chec-kpoint.period),如果编辑日志达到64M(fs.checkpoint.size),每5分钟检查一次。 上述的过程都是在安全模式下进行的。
|