# 从Hadoop开始-HDFS与MapReduce
## Hadoop由来
2003年,Google发表了一篇技术学术论文,公开介绍了自己的谷歌文件系统**GFS(Google File System)**。这是Google公司为了存储海量搜索数据而设计的专用文件系统。
第二年,也就是2004年,Doug Cutting基于Google的GFS论文,实现了**分布式文件存储系统**,并将它命名为**NDFS(Nutch Distributed File System)**。
还是2004年,Google又发表了一篇技术学术论文,介绍自己的**MapReduce编程模型**。这个编程模型,用于大规模数据集(大于1TB)的并行分析运算。
第二年(2005年),Doug Cutting又基于MapReduce,在Nutch搜索引擎实现了该功能。
加盟Yahoo之后,Doug Cutting将NDFS和MapReduce进行了升级改造,并重新命名为**Hadoop**(NDFS也改名为HDFS,Hadoop Distributed File System)。
这个,就是后来大名鼎鼎的大数据框架系统——Hadoop的由来。而Doug Cutting,则被人们称为**Hadoop之父**。
***
## Hadoop核心架构
Hadoop的核心,说白了,就是**HDFS**和**MapReduce**。HDFS为海量数据提供了**存储**,而MapReduce为海量数据提供了**计算框架**。
![hadoop架构.jpg](https://www.concoding.com/upload/2021/03/hadoop%E6%9E%B6%E6%9E%84-a78b47ee52c84464a658d8b537fcf4b5.jpg)
***
### HDFS
HDFS是分布式文件系统,是GFS的开源实现,它的特点如下:
- 能够运行在廉价机器上,硬件出错常态,需要具备高容错性
- 流式数据访问,而不是随机读写
- 面向大规模数据集,能够进行批处理、能够横向扩展
- 简单一致性模型,假定文件是一次写入、多次读取
**缺点:**
- 不支持低延迟数据访问
- 不适合大量小文件存储(因为每条元数据占用空间是一定的)
- 不支持并发写入,一个文件只能有一个写入者
- 不支持文件随机修改,仅支持追加写入
#### HDFS架构
**1.0版本**
HDFS 采用的是 Master/Slave 架构,一个 HDFS 集群包含一个单独的 NameNode 和多个 DataNode 节点,如下图所示(这个图是 HDFS1.0的架构图,经典的架构图):
![hdfs1.0架构.jpg](https://www.concoding.com/upload/2021/03/hdfs1.0%E6%9E%B6%E6%9E%84-0878f18da8d8450f8ecaea8aeb10ccd8.jpg)
#### NameNode
NameNode 负责管理整个分布式系统的元数据,主要包括:
- 目录树结构;
- 文件到数据库 Block 的映射关系;
- Block 副本及其存储位置等管理数据;
- DataNode 的状态监控,两者通过段时间间隔的心跳来传递管理信息和数据信息,通过这种方式的信息传递,NameNode 可以获知每个 DataNode 保存的 Block 信息、DataNode 的健康状况、命令 DataNode 启动停止等(如果发现某个 DataNode 节点故障,NameNode 会将其负责的 block 在其他 DataNode 上进行备份)。
这些数据保存在内存中,同时在磁盘保存两个元数据管理文件:fsimage 和 editlog。
- **fsimage**:是内存命名空间元数据在外存的镜像文件,保存文件系统目录树结构;
- **editlog**:则是各种元数据操作的WAL预写日志文件,在体现到内存数据变化前首先会将操作记入 editlog 中,以防止数据丢失。
这两个文件相结合可以构造完整的内存数据。
> **WAL-预写式日志**
>
> **预写式日志(Write-ahead logging,缩写 WAL)**是关系数据库系统中用于提供原子性和持久性(ACID属性中的两个)的一系列技术。在使用 WAL 的系统中,所有的修改在**提交之前**都要先**写入 log 文件**中。
>
> WAL 机制的原理也很简单:**「修改并不直接写入到文件系统中,而是写入到另外一个称为 WAL 的文件中(本文的editlog),写入操作写入到log文件中才算写入成功。」**
>
> 这样做的目的是:FsImage对于HDFS来讲规模是非常大的,每次修改FsImage非常慢,所以每次先不更新FsImage,先记录在EditLog中,提高操作效率。
>
> WAL还有一个重要的作用是容灾回复,一旦集群宕机,重启后可以重跑一边WAL日志恢复数据。
>**EditLog刷盘机制**
>
>上面讲到了namenode响应客户端的操作后,会将操作日志写入到editLog文件中,那么editLog文件的刷盘机制是怎样的呢?也就是editLog文件中的内容什么时候会被真正写入到磁盘呢?
>
>1. SNN触发刷盘
>
> 在配置了HA的模式下,SNN为了尽可能的保持与ANN(active namenode,后面均简称ANN)的元数据一致,因此会不断从ANN中读取editLog文件的内容,这样当ANN出现异常时,可以快速接管。
>
> 因此,在SNN中,每隔一段时间就会检测当前时间距离上一次刷盘的时间是否超过了指定的值,如果超过了则通过rpc请求触发ANN进行editLog的刷盘,然后再从ANN中读取edtiLog文件的内容。
>
>2. ANN触发刷盘
>
> ANN启动后,也会定时进行检测,当未刷盘的操作事务累计到一定数量后,触发进行刷盘动作。
>
>3. 另外,editLog文件打开时,设置了一定大小的缓存(512KB),每个操作事务都是先写入缓存,每次写完后,判断当前缓存大小是否超过了设置的指定大小,如果是则将缓存中的内容刷到磁盘上。
>
>4. 实际上,你还可以通过配置进行设置,每一条操作写入文件后,都立即进行刷盘,这种方式不是默认配置,也不推荐使用(毕竟有性能损耗)。
#### Secondary NameNode
Secondary NameNode 并**不是** NameNode 的热备机,而是定期从 NameNode 拉取 fsimage 和 editlog 文件,并对两个文件进行合并,形成新的 fsimage 文件并传回 NameNode,这样做的目的是**减轻 NameNod 的工作压力**,本质上 SNN 是一个提供检查点(check point)功能服务的服务点,并**不是**NN的一个备份主机。
>每次NameNode启动时都会跑一遍editlog的操作以保证fsimage最新,并生成一个新的空editlog文件。可是实际上NameNode是很少重启的,这将导致每次重启NameNode时都会重跑巨大的EditLog文件,而Secondary NameNode可以定时合并fsimage和EditLog,让EditLog文件体积不至于堆积过大,减轻NameNode压力,这也是SNN的好处。
Fsimage和EditLog合并流程:
1. 将hdfs更新记录写入一个新的文件——edits.new。
2. 将fsimage和editlog通过http协议发送至secondary namenode。
3. 将fsimage与editlog合并,生成一个新的文件——fsimage.ckpt。这步之所以要在secondary namenode中进行,是因为比较耗时,如果在namenode中进行或导致整个系统卡顿。
4. 将生成的fsimage.ckpt通过http协议发送至namenode。
5. 重命名fsimage.ckpt为fsimage,edits.new为edits。
![SNN.png](https://www.concoding.com/upload/2021/03/SNN-3605e145f4ae49fea1cacb63d964a5d8.png)
#### DataNode
负责数据块的实际存储和读写工作,Block 默认是64MB(HDFS2.0改成了128MB),当客户端上传一个大文件时,HDFS 会自动将其切割成固定大小的 Block,为了保证数据可用性,每个 Block 会以多备份的形式存储,默认是3份。
> 上传文件的切分是由Client负责完成的。
***
### HDFS数据读写过程
要把读写过程细节搞明白前,你必须知道block、packet与chunk。下面分别讲述。
- **block**
这个大家应该知道,文件上传前需要分块,这个块就是block,一般为128MB,当然你可以去改,不顾不推荐。因为块太小:寻址时间占比过高。块太大:Map任务数太少,作业执行速度变慢。它是最大的一个单位。
- **packet**
packet是第二大的单位,它是client端向DataNode,或DataNode的PipLine之间传数据的基本单位,默认64KB。
- **chunk**
chunk是最小的单位,它是client向DataNode,或DataNode的PipLine之间进行数据校验的基本单位,默认512Byte,因为用作校验,故每个chunk需要带有4Byte的校验位。所以实际每个chunk写入packet的大小为516Byte。由此可见真实数据与校验值数据的比值约为128 : 1。(即64*1024 / 512)
例如,在client端向DataNode传数据的时候,HDFSOutputStream会有一个chunk buff,写满一个chunk后,会计算校验和并写入当前的chunk。之后再把带有校验和的chunk写入packet,当一个packet写满后,packet会进入dataQueue队列,其他的DataNode就是从这个dataQueue获取client端上传的数据并存储的。同时一个DataNode成功存储一个packet后之后会返回一个ack packet,放入ack Queue中。
#### HDFS写文件:
如图示:
![hdfswriteflow.png](https://www.concoding.com/upload/2021/03/hdfs-write-flow-be636042a5df4825a2516427d7738a98.png)
具体过程如下:
1. 客户端向NameNode发出写文件请求。
2. namenode收到客户端的请求后,首先会检测元数据的目录树;检查权限并判断待上传的文件是否已存在,如果已存在,则拒绝client的上传。如果不存在,则响应客户端可以上传。
> 注:WAL,write ahead log,先写Log,再写内存,因为EditLog记录的是最新的HDFS客户端执行所有的写操作。如果后续真实写操作失败了,由于在真实写操作之前,操作就被写入EditLog中了,故EditLog中仍会有记录,我们不用担心后续client读不到相应的数据块,因为在第5步中DataNode收到块后会有一返回确认信息,若没写成功,发送端没收到确认信息,会一直重试,直到成功
3. 客户端收到可以上传的响应后,会把待上传的文件切块(hadoop2.x默认块大小为128M);然后再次给namenode发送请求,上传第一个block块。
4. namenode收到客户端上传block块的请求后,首先会检测其保存的datanode信息,确定该文件块存储在那些节点上;最后,响应给客户端一组datanode节点信息。
5. 客户端根据收到datanode节点信息,首先就近与某台datanode建立网络连接;然后该datanode节点会与剩下的节点建立传输通道(链式),通道连通后返回确认信息给客户端;表示通道已连通,可以传输数据。
6. 客户端收到确认信息后,通过网络向就近的datanode节点写第一个block块的数据;就近的datanode收到数据后,首先会缓存起来;然后将缓存里数据保存一份到本地,一份发送到传输通道;让剩下的datanode做备份。
client将NameNode返回的分配的可写的DataNode列表和Data数据一同发送给最近的第一个DataNode节点,此后client端和NameNode分配的多个DataNode构成pipeline管道,client端向输出流对象中写数据。client每向第一个DataNode写入一个packet,这个packet便会直接在pipeline里传给第二个、第三个…DataNode。
> 注:并不是写好一个块或一整个文件后才向后分发
每个DataNode写完一个块后,会返回确认信息。
> 注:并不是每写完一个packet后就返回确认信息,个人觉得因为packet中的每个chunk都携带校验信息,没必要每写一个就汇报一下,这样效率太慢。正确的做法是写完一个block块后,对校验信息进行汇总分析,就能得出是否有块写错的情况发生
7. 第一个block块写入完毕,若客户端还有剩余的block未上传;则客户端会从(3)开始,继续执行上述步骤;直到整个文件上传完毕。
写完数据,关闭输输出流。
发送完成信号给NameNode。
> 注:发送完成信号的时机取决于集群是强一致性还是最终一致性,强一致性则需要所有DataNode写完后才向NameNode汇报。最终一致性则其中任意一个DataNode写完后就能单独向NameNode汇报,HDFS一般情况下都是强调强一致性
#### HDFS读文件
如图示:
![hdfsreadflow.png](https://www.concoding.com/upload/2021/03/hdfs-read-flow-968de1c503c34001918bebaefb1987dc.png)
其具体过程总结如下(简单总结一下):
1. Client 通过 DistributedFileSystem 对象与集群的 NameNode 进行一次 RPC 远程调用,获取文件 block 位置信息;
2. NameNode 返回存储的每个块的 DataNode 列表;
3. Client 将连接到列表中最近的 DataNode;
4. Client 开始从 DataNode 并行读取数据;
5. 一旦 Client 获得了所有必须的 block,它就会将这些 block 组合起来形成一个文件。
在处理 Client 的读取请求时,HDFS 会利用机架感知选举最接近 Client 位置的副本,这将会减少读取延迟和带宽消耗。
***
### HDFS1.0的不足与改进
在前面的介绍中,关于 HDFS1.0 的架构,首先都会看到 NameNode 的单点问题,这个在生产环境中是非常要命的问题,早期的 HDFS 由于规模较小,有些问题就被隐藏了,但自从进入了移动互联网时代,很多公司都开始进入了 PB 级的大数据时代,HDFS 1.0的设计缺陷已经无法满足生产的需求,最致命的问题有以下两点:
- NameNode 的单点问题,如果 NameNode 挂掉了,数据读写都会受到影响,HDFS 整体将变得不可用,这在生产环境中是不可接受的;
- 水平扩展问题,随着集群规模的扩大,1.0 时集群规模达到3000时,会导致整个集群管理的文件数目达到上限(因为 NameNode 要管理整个集群 block 元信息、数据目录信息等)。
为了解决上面的两个问题,Hadoop2.0 提供一套统一的解决方案:
1. HA(High Availability 高可用方案):这个是为了解决 NameNode 单点问题;
2. NameNode Federation:是用来解决 HDFS 集群的线性扩展能力。
### HDFS2.0HA实现
这里先看下 HDFS 高可用解决方案的架构设计,如下图所示:
![hdfsha.png](https://www.concoding.com/upload/2021/03/hdfs-ha-bfee271c88ac4e64a3c6db62f140ffd2.png)
这里与前面 1.0 的架构已经有很大变化,简单介绍一下上面的组件:
1. Active NameNode 和 Standby NameNode:两台 NameNode 形成互备,一台处于 Active 状态,为主 NameNode,另外一台处于 Standby 状态,为备 NameNode,只有主 NameNode 才能对外提供读写服务;
2. ZKFailoverController(主备切换控制器,FC):ZKFailoverController 作为独立的进程运行,对 NameNode 的主备切换进行总体控制。ZKFailoverController 能及时检测到 NameNode 的健康状况,在主 NameNode 故障时借助 Zookeeper 实现自动的主备选举和切换(当然 NameNode 目前也支持不依赖于 Zookeeper 的手动主备切换);
3. Zookeeper 集群:为主备切换控制器提供主备选举支持;
4. 共享存储系统:共享存储系统是实现 NameNode 的高可用最为关键的部分,共享存储系统保存了 NameNode 在运行过程中所产生的 HDFS 的元数据。主 NameNode 和备 NameNode 通过共享存储系统实现元数据同步。在进行主备切换的时候,新的主 NameNode 在**确认元数据完全同步之后才能继续对外提供服务**。
5. DataNode 节点:因为主 NameNode 和备 NameNode 需要共享 HDFS 的数据块和 DataNode 之间的映射关系,为了使故障切换能够快速进行,DataNode 会同时向主 NameNode 和备 NameNode 上报数据块的位置信息。
> HDFS2.0架构有许多改进,本文暂时不探讨。
***
### MapReduce
再来看MapReduce,MapReduce其实是一种编程模型。这个模型的核心步骤主要分两部分:**Map(映射)**和**Reduce(归约)**。
当你向MapReduce框架提交一个计算作业时,它会首先把计算作业拆分成若干个**Map任务**,然后分配到不同的节点上去执行,每一个Map任务处理输入数据中的一部分,当Map任务完成后,它会生成一些中间文件,这些中间文件将会作为**Reduce任务**的输入数据。Reduce任务的主要目标就是把前面若干个Map的输出汇总到一起并输出。
> 大家对MR模型已有一定了解,这里不再赘述,这里只讨论MR任务执行时的资源调度框架。
在MapReduce里,为了完成上面这些过程,需要两个角色:**JobTracker**和**TaskTracker**。
![mapreduce1.0.jpg](https://www.concoding.com/upload/2021/03/mapreduce1.0-2334f8ba38b7454bb9e890a4aee1d07b.jpg)
JobTracker用于调度和管理其它的TaskTracker。JobTracker可以运行于集群中任一台计算机上。TaskTracker 负责执行任务,必须运行于 DataNode 上。
但是,1.0版本中,存在一些问题:
1. 扩展性差,JobTracker负载较重,成为性能瓶颈。
2. 可靠性差,NameNode只有一个,万一挂掉,整个系统就会崩溃(2.0中加入HA)。
3. 仅适用MapReduce一种计算方式。
4. 资源管理的效率比较低。
Hadoop2.0的诞生有效解决了这些问题。
***
## Hadoop1.0与2.0
### **Namenode的HA**
NameNode其实是Hadoop的一个目录服务,它包含着整个集群存储的文件的元数据。
早期发行的Hadoop1版本将所有HDFS目录和文件的元数据存储到一个NameNode单点。整个集群的数据状态取决于这个单点的成败。随后的版本添加了一个secondary NameNode节点,作为冷备份的从NameNode节点。Secondary NameNode节点周期性地将写日志(edit log)和NameNode的映象文件(image file)合并,这样做有两个优点:首先,由于主NameNode节点在启动的时候不需要完全合并写日志和映象文件,因此主NameNode节点的启动时间缩短了;其次,从NameNode节点复制NameNode的所有信息,这样当NameNode节点出现不可恢复的故障时,数据丢失会降到最低。但是,secondary NameNode并不是一个热备份节点,这意味着故障切换时间和恢复时间较长,且集群可用性会受到影响。
Hadoop2做出了改进,有了NameNode的热备节点。当主NameNode节点故障了,从NameNode就能够在自动转变成主NameNode,这就意味着hadoop集群可以提供无数据丢失且不间断的NameNode服务,并且自动故障切换也比较容易实现。
热备份的关键在于维护它的数据尽可能与主NameNode节点保持一致,可以通过读取主NameNode的写日志文件并在备份节点上执行来实现,并且延时也是非常低的。写日志文件的共享可以使用以下两种方法来实现:
在主NameNode和从NameNode节点间使用共享的网络文件系统(Network File System,NFS)存储目录:主NameNode往共享目录中写入日志,而从NameNode监听这个共享目录的变更消息,然后拉取这些变更。
使一组JournalNode(quorum of Journal Nodes):主NameNode将写日志发送到部分JournalNode以记录信息,而从NameNode持续监听这些JournalNode,从而更新和同步主NameNode的状态。
### **JobTracker 和 [YARN](https://www.concoding.com/archives/yarn%E4%B8%AD%E7%9A%84%E4%BD%9C%E4%B8%9A%E7%AE%A1%E7%90%86)**
Hadoop1采用集中式作业流控制,然而集中式系统由于其负载的单点问题,很难实现扩展。一旦JobTracker(作业跟踪器)出现故障,系统中所有的作业都必须重新启动,这对整个集中式组件造成了极大压力。由于计算模型是和集群的资源紧密联系的,所以只能支持MapReduce一种计算模型。这种紧密的耦合导致开发者强行适配其他的计算模型,从而出现了与MapReduce设计意图相悖的使用方式。按照这种模式,Hadoop很难与其他类型的集群进行集成。
Hadoop2引入了[YARN](https://www.concoding.com/archives/yarn%E4%B8%AD%E7%9A%84%E4%BD%9C%E4%B8%9A%E7%AE%A1%E7%90%86)。[YARN](https://www.concoding.com/archives/yarn%E4%B8%AD%E7%9A%84%E4%BD%9C%E4%B8%9A%E7%AE%A1%E7%90%86)的主要设计目标是将大家比较关注的资源管理(resource management)和应用执行(application execution)之间的耦合隔离,然后其他的应用模式就可以在Hadoop集群上执行了。增强不同计算模型和各种应用之间的交互,使得集群的资源得到高效的利用,同时也能更好地与企业中已经存在的计算结构集成在一起。
### **HDFS快照**
Haddoop2 引入了快照(snapshot)。快照是文件系统的整体或部分目录在某个时间点的只读镜像(image),通常是为了以下三个原因:
防止用户的错误操作导致的数据损坏或丢失、备份、容灾
快照仅在NameNode上实现,它不会涉及数据从一个数据节点复制到另一个数据节点,而仅仅是复制了块列表以及文件的大小。生成一个快照的操作几乎是瞬间完成的,它不会影响NameNode节点的性能。
### **序列化方式**
Hadoop1的RPC通信协议是使用Java的Writables序列化实现的,但在Hadoop2中是基于Protocol Buffers实现的。这个改进不仅很容易保持向后兼容,而且帮助集群中的不同组件实现了滚动升级(rolling the upgrades)。另外,RPC也允许在客户端实现重试功能。
### **支持SSD感知**
Hadoop1是不感知存储设备的类型的,这意味着机械硬盘和SSD(固态硬盘)被一样对待。用户无法对数据的布局做任何干预。2014年发布的Hadoop2版本能够识别存储设备的类型,并且应用程序可以获取到这些信息。这样,应用程序就可以通过这些信息来优化它们的数据存取和布局策略。
### **HDFS IO方面的改进**
Hadoop1是通过HDFS客户端访问文件系统的。Hadoop2开始支持NFSv3,促进了NFS网关组件的诞生。现在,HDFS可以挂载(mount)到用户本地兼容的文件系统上,他们可以直接往HDFS下载或上传文件。往已有的文件追加内容是可以的,但是随机写(random write)是不支持的。同时,Hadoop2的I/O也进行了大量的改进。例如,在Hadoop1中,当客户端运行在某个数据节点上时,它需要通过TCP来读取本地数据。但是,有了本地快捷读取(short-circuit local reads),客户端就可以直接读取本地的数据;通过特定的接口还可以实现零复制(zero-copy)数据读取;读或写数据的CRC校验码计算方法也进行了优化。
### **支持更多的操作系统**
Hadoop 2.X天然支持微软的Windows系统。这个转变使得微软的Windows服务器有极好的机会进入大数据处理领域。当然,部分原因得归功于Hadoop开发使用的Java编程语言有很好的可移植性,但更重要的原因在于Hadoop对计算和存储的通用性的增强,使其能支持包括Windows在内的系统。
从Hadoop开始-HDFS与MapReduce