2.1 Hadoop概述

2.1.1 Hadoop简介

随着现代社会的发展,各种信息数据存量与增量都非常大,很多情况下需要我们能够对TB级,甚至PB级数据集进行存储和快速分析,然而单机的计算机,无论是硬盘存储、网络IO、计算CPU还是内存都是非常有限的。针对这种情况,Hadoop应运而生。

那么,Hadoop是什么呢?我们可以很容易在一些比较权威的网站上找到它的定义,例如:Hadoop是一个由Apache基金会所开发的分布式系统基础架构,它可以使用户在不了解分布式底层细节的情况下开发分布式程序,充分利用集群的威力进行高速运算和存储。

从其定义就可以发现,它解决了两大问题:大数据存储、大数据分析。也就是Hadoop的两大核心:HDFS和MapReduce。

HDFS(Hadoop Distributed File System)是可扩展、容错、高性能的分布式文件系统,异步复制,一次写入多次读取,主要负责存储。

MapReduce为分布式计算框架,主要包含map(映射)和reduce(归约)过程,负责在HDFS上进行计算。

要深入学习Hadoop,就不得不提到Google的3篇相关论文,也就是Hadoop的基础理论。

❑2003年发表的《The Google File System》,奠定了“首个商用的超大型分布式文件系统”,从而验证这种分布式文件系统架构是可行的。

❑2004年发表的《MapReduce: Simplifed Data Processing on Large Clusters》,汲取了函数式编程设计思想,倡导把计算移动到数据思想,该思想的应用大大加快了数据的处理分析。

❑2006年发表的《Bigtable: A Distributed Storage System for Structured Data》,这一论文同样也是告诉大家,这种分布式数据库的架构是可行的。

说到这里,我们来简单了解下Hadoop的发展历史,如图2-1所示。

图2-1 Hadoop发展历史

2002~2004年,第一轮互联网泡沫刚刚破灭,很多互联网从业人员都失业了。我们的“主角”Doug Cutting也不例外,他只能写点技术文章赚点稿费来养家糊口。但是Doug Cutting不甘寂寞,怀着对梦想和未来的渴望,与他的好朋友Mike Cafarella一起开发出一个开源的搜索引擎Nutch,并历时一年把这个系统做到能支持亿级网页的搜索。但是当时的网页数量远远不止这个规模,所以两人不断改进,想让支持的网页量再多一个数量级。

在2003年和2004年,Google分别公布了GFS和MapReduce两篇论文。Doug Cutting和Mike Cafarella发现这与他们的想法不尽相同,且更加完美,完全脱离了人工运维的状态,实现了自动化。

在经过一系列周密考虑和详细总结后,2006年,Dog Cutting放弃创业,随后几经周折加入了Yahoo公司(Nutch的一部分也被正式引入),机缘巧合下,他以自己儿子的一个玩具大象的名字Hadoop命名了该项目。

当系统进入Yahoo以后,项目逐渐发展并成熟了起来。首先是集群规模,从最开始几十台机器的规模发展到能支持上千个节点的机器,中间做了很多工程性质的工作;然后是除搜索以外的业务开发,Yahoo逐步将自己广告系统的数据挖掘相关工作也迁移到了Hadoop上,使Hadoop系统进一步成熟化了。

2007年,纽约时报在100个亚马逊的虚拟机服务器上使用Hadoop转换了4TB的图片数据,更加加深了人们对Hadoop的印象。

在2008年的时候,一位Google的工程师发现要把当时的Hadoop放到任意一个集群中去运行是一件很困难的事情,所以就与几个好朋友成立了一个专门商业化Hadoop的公司Cloudera。同年,Facebook团队发现他们很多人不会写Hadoop的程序,而对SQL的一套东西很熟,所以他们就在Hadoop上构建了一个叫作Hive的软件,专门用于把SQL转换为Hadoop的MapReduce程序。

2011年,Yahoo将Hadoop团队独立出来,成立了一个子公司Hortonworks,专门提供Hadoop相关的服务。

说了这么多,那Hadoop有哪些优点呢?

Hadoop是一个能够让用户轻松架构和使用的分布式计算的平台。用户可以轻松地在Hadoop上开发和运行处理海量数据的应用程序。其优点主要有以下几个。

❑高可靠性:Hadoop按位存储和处理数据的能力值得人们信赖。

❑高扩展性:Hadoop是在可用的计算机集簇间分配数据并完成计算任务的,这些集簇可以方便地扩展到数以千计的节点中。

❑高效性:Hadoop能够在节点之间动态地移动数据,并保证各个节点的动态平衡,因此处理速度非常快。

❑高容错性:Hadoop能够自动保存数据的多个副本,并且能够自动将失败的任务重新分配。

❑低成本:与一体机、商用数据仓库以及QlikView、Yonghong Z-Suite等数据集市相比,Hadoop是开源的,项目的软件成本因此会大大降低。

❑Hadoop带有用Java语言编写的框架,因此运行在Linux生产平台上是非常理想的,Hadoop上的应用程序也可以使用其他语言编写,比如C++。

2.1.2 Hadoop存储——HDFS

Hadoop的存储系统是HDFS(Hadoop Distributed File System)分布式文件系统,对外部客户端而言,HDFS就像一个传统的分级文件系统,可以进行创建、删除、移动或重命名文件或文件夹等操作,与Linux文件系统类似。

但是,Hadoop HDFS的架构是基于一组特定的节点构建的(见图2-2),这些节点包括名称节点(NameNode,仅一个),它在HDFS内部提供元数据服务;第二名称节点(Secondary NameNode),名称节点的帮助节点,主要是为了整合元数据操作(注意不是名称节点的备份);数据节点(DataNode),它为HDFS提供存储块。由于仅有一个NameNode,因此这是HDFS的一个缺点(单点失败,在Hadoop2.X后有较大改善)。

图2-2 Hadoop HDFS架构

存储在HDFS中的文件被分成块,然后这些块被复制到多个数据节点中(DataNode),这与传统的RAID架构大不相同。块的大小(通常为128MB)和复制的块数量在创建文件时由客户机决定。名称节点可以控制所有文件操作。HDFS内部的所有通信都基于标准的TCP/IP协议。

关于各个组件的具体描述如下所示:

(1)名称节点(NameNode)

它是一个通常在HDFS架构中单独机器上运行的组件,负责管理文件系统名称空间和控制外部客户机的访问。NameNode决定是否将文件映射到DataNode上的复制块上。对于最常见的3个复制块,第一个复制块存储在同一机架的不同节点上,最后一个复制块存储在不同机架的某个节点上。

(2)数据节点(DataNode)

数据节点也是一个通常在HDFS架构中的单独机器上运行的组件。Hadoop集群包含一个NameNode和大量DataNode。数据节点通常以机架的形式组织,机架通过一个交换机将所有系统连接起来。

数据节点响应来自HDFS客户机的读写请求。它们还响应来自NameNode的创建、删除和复制块的命令。名称节点依赖来自每个数据节点的定期心跳(heartbeat)消息。每条消息都包含一个块报告,名称节点可以根据这个报告验证块映射和其他文件系统元数据。如果数据节点不能发送心跳消息,名称节点将采取修复措施,重新复制在该节点上丢失的块。

(3)第二名称节点(Secondary NameNode)

第二名称节点的作用在于为HDFS中的名称节点提供一个Checkpoint,它只是名称节点的一个助手节点,这也是它在社区内被认为是Checkpoint Node的原因。

如图2-3所示,只有在NameNode重启时,edits才会合并到fsimage文件中,从而得到一个文件系统的最新快照。但是在生产环境集群中的NameNode是很少重启的,这意味着当NameNode运行很长时间后,edits文件会变得很大。而当NameNode宕机时,edits就会丢失很多改动,如何解决这个问题呢?

图2-3 名称节点功能

注意

fsimage是Namenode启动时对整个文件系统的快照;edits是在Namenode启动后对文件系统的改动序列。

如图2-4所示,Secondary NameNode会定时到NameNode去获取名称节点的edits,并及时更新到自己fsimage上。这样,如果NameNode宕机,我们也可以使用SecondaryNamenode的信息来恢复NameNode。并且,如果SecondaryNameNode新的fsimage文件达到一定阈值,它就会将其拷贝回名称节点上,这样NameNode在下次重启时会使用这个新的fsimage文件,从而减少重启的时间。

图2-4 NameNode帮助节点SecondaryNameNode

举个数据上传的例子来深入理解下HDFS内部是怎么做的,如图2-5所示。

图2-5 HDFS文件上传

文件在客户端时会被分块,这里可以看到文件被分为5个块,分别是:A、B、C、D、E。同时为了负载均衡,所以每个节点有3个块。下面来看看具体步骤:

1)客户端将要上传的文件按128MB的大小分块。

2)客户端向名称节点发送写数据请求。

3)名称节点记录各个DataNode信息,并返回可用的DataNode列表。

4)客户端直接向DataNode发送分割后的文件块,发送过程以流式写入。

5)写入完成后,DataNode向NameNode发送消息,更新元数据。

这里需要注意:

1)写1T文件,需要3T的存储,3T的网络流量。

2)在执行读或写的过程中,NameNode和DataNode通过HeartBeat进行保存通信,确定DataNode活着。如果发现DataNode死掉了,就将死掉的DataNode上的数据,放到其他节点去,读取时,读其他节点。

3)宕掉一个节点没关系,还有其他节点可以备份;甚至,宕掉某一个机架也没关系;其他机架上也有备份。

2.1.3 Hadoop计算——MapReduce

MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”以及它们的主要思想,都是从函数式编程语言借来的,还有从矢量编程语言借来的特性。

当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归纳)函数,用来保证所有映射的键值对中的每一个共享相同的键组,如图2-6所示。

图2-6 Map/Reduce简单理解

下面将以Hadoop的“Hello World”例程——单词计数来分析MapReduce的逻辑,如图2-7所示。一般的MapReduce程序会经过以下几个过程:输入(Input)、输入分片(Splitting)、Map阶段、Shuffle阶段、Reduce阶段、输出(Final result)。

图2-7 Hadoop MapReduce单词计数逻辑

1)输入就不用说了,数据一般放在HDFS上面就可以了,而且文件是被分块的。关于文件块和文件分片的关系,在输入分片中说明。

2)输入分片:在进行Map阶段之前,MapReduce框架会根据输入文件计算输入分片(split),每个输入分片会对应一个Map任务,输入分片往往和HDFS的块关系很密切。例如,HDFS的块的大小是128MB,如果我们输入两个文件,大小分别是27MB、129MB,那么27MB的文件会作为一个输入分片(不足128M会被当作一个分片),而129MB则是两个输入分片(129-128=1,不足128MB,所以1MB也会被当作一个输入分片),所以,一般来说,一个文件块会对应一个分片。如图2-7所示,Splitting对应下面的三个数据应该理解为三个分片。

3)Map阶段:这个阶段的处理逻辑其实就是程序员编写好的Map函数,因为一个分片对应一个Map任务,并且是对应一个文件块,所以这里其实是数据本地化的操作,也就是所谓的移动计算而不是移动数据。如图2-7所示,这里的操作其实就是把每句话进行分割,然后得到每个单词,再对每个单词进行映射,得到单词和1的键值对。

4)Shuffle阶段:这是“奇迹”发生的地方,MapReduce的核心其实就是Shuffle。那么Shuffle的原理呢?Shuffle就是将Map的输出进行整合,然后作为Reduce的输入发送给Reduce。简单理解就是把所有Map的输出按照键进行排序,并且把相对键的键值对整合到同一个组中。如图2-7所示,Bear、Car、Deer、River是排序的,并且Bear这个键有两个键值对。

5)Reduce阶段:与Map类似,这里也是用户编写程序的地方,可以针对分组后的键值对进行处理。如图2-7所示,针对同一个键Bear的所有值进行了一个加法操作,得到<Bear,2>这样的键值对。

6)输出:Reduce的输出直接写入HDFS上,同样这个输出文件也是分块的。

说了这么多,其实MapReduce的本质用一张图可以完整地表现出来,如图2-8所示。

图2-8 MapReduce本质

MapReduce的本质就是把一组键值对<K1, V1>经过Map阶段映射成新的键值对<K2, V2>;接着经过Shuffle/Sort阶段进行排序和“洗牌”,把键值对排序,同时把相同的键的值整合;最后经过Reduce阶段,把整合后的键值对组进行逻辑处理,输出到新的键值对<K3, V3>。这样的一个过程,其实就是MapReduce的本质。

Hadoop MapReduce可以根据其使用的资源管理框架不同,而分为MR v1和YARN/MR v2版本,如图2-9所示。

图2-9 MapReduce发展历史

在MR v1版本中,资源管理主要是Jobtracker和TaskTracker。Jobtracker主要负责:作业控制(作业分解和状态监控),主要是MR任务以及资源管理;而TaskTracker主要是调度Job的每一个子任务task;并且接收JobTracker的命令。

在YARN/MR v2版本中,YARN把JobTracker的工作分为两个部分:

1)ResourceManager(资源管理器)全局管理所有应用程序计算资源的分配。

2)ApplicationMaster负责相应的调度和协调。

NodeManager是每一台机器框架的代理,是执行应用程序的容器,监控应用程序的资源(CPU、内存、硬盘、网络)使用情况,并且向调度器汇报。

2.1.4 Hadoop资源管理——YARN

在上一节中我们看到,当MapReduce发展到2.x时就不使用JobTracker来作为自己的资源管理框架,而选择使用YARN。这里需要说明的是,如果使用JobTracker来作为Hadoop集群的资源管理框架的话,那么除了MapReduce任务以外,不能够运行其他任务。也就是说,如果我们集群的MapReduce任务并没有那么饱满的话,集群资源等于是白白浪费的。所以提出了另外的一个资源管理架构YARN(Yet Another Resource Manager)。这里需要注意,YARN不是JobTracker的简单升级,而是“大换血”。同时Hadoop 2.X也包含了此架构。Apache Hadoop 2.X项目包含以下模块。

❑Hadoop Common:为Hadoop其他模块提供支持的基础模块。

❑HDFS: Hadoop:分布式文件系统。

❑YARN:任务分配和集群资源管理框架。

❑MapReduce:并行和可扩展的用于处理大数据的模式。

如图2-10所示,YARN资源管理框架包括ResourceManager(资源管理器)、ApplicationMaster、NodeManager(节点管理器)。各个组件描述如下。

图2-10 YARN架构

(1)ResourceManager

ResourceManager是一个全局的资源管理器,负责整个系统的资源管理和分配。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(ApplicationManager, AM)。

Scheduler负责分配最少但满足Application运行所需的资源量给Application。Scheduler只是基于资源的使用情况进行调度,并不负责监视/跟踪Application的状态,当然也不会处理失败的Task。

ApplicationManager负责处理客户端提交的Job以及协商第一个Container以供ApplicationMaster运行,并且在ApplicationMaster失败的时候会重新启动ApplicationMaster(YARN中使用Resource Container概念来管理集群的资源,Resource Container是资源的抽象,每个Container包括一定的内存、IO、网络等资源)。

(2)ApplicationMaster

ApplicatonMaster是一个框架特殊的库,每个Application有一个ApplicationMaster,主要管理和监控部署在YARN集群上的各种应用。

(3)NodeManager

主要负责启动Resourcemanager分配给ApplicationMaster的Container,并且会监视Container的运行情况。在启动Container的时候,NodeManager会设置一些必要的环境变量以及相关文件;当所有准备工作做好后,才会启动该Container。启动后,NodeManager会周期性地监视该Container运行占用的资源情况,若是超过了该Container所声明的资源量,则会kill掉该Container所代表的进程。

如图2-11所示,该集群上有两个任务(对应Node2、Node6上面的AM),并且Node2上面的任务运行有4个Container来执行任务;而Node6上面的任务则有2个Container来执行任务。

图2-11 YARN集群

2.1.5 Hadoop生态系统

如图2-12所示,Hadoop的生态圈其实就是一群动物在狂欢。我们来看看一些主要的框架。

图2-12 Hadoop生态系统

(1)HBase

HBase(Hadoop Database)是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统,利用HBase技术可在廉价PC Server上搭建起大规模结构化存储集群。

(2)Hive

Hive是建立在Hadoop上的数据仓库基础构架。它提供了一系列的工具,可以用来进行数据提取转化加载(ETL),这是一种可以存储、查询和分析存储在Hadoop中的大规模数据的机制。

(3)Pig

Pig是一个基于Hadoop的大规模数据分析平台,它提供的SQL-LIKE语言叫作Pig Latin。该语言的编译器会把类SQL的数据分析请求转换为一系列经过优化处理的MapReduce运算。

(4)Sqoop

Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(MySQL、post-gresql等)间进行数据的传递,可以将一个关系型数据库中的数据导入Hadoop的HDFS中,也可以将HDFS的数据导入关系型数据库中,如图2-13所示。

图2-13 Sqoop功能

(5)Flume

Flume是Cloudera提供的一个高可用、高可靠、分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据。同时,Flume提供对数据进行简单处理并写到各种数据接受方(可定制)的能力,如图2-14所示。

图2-14 Flume数据传输

(6)Oozie

Oozie是基于Hadoop的调度器,以XML的形式写调度流程,可以调度Mr、Pig、Hive、shell、jar任务等。

主要的功能如下。

1)Workflow:顺序执行流程节点,支持fork(分支多个节点)、join(将多个节点合并为一个)。

2)Coordinator:定时触发Workflow。

3)Bundle Job:绑定多个Coordinator。

(7)Chukwa

Chukwa是一个开源的、用于监控大型分布式系统的数据收集系统。它构建在Hadoop的HDFS和MapReduce框架上,继承了Hadoop的可伸缩性和鲁棒性。Chukwa还包含了一个强大和灵活的工具集,可用于展示、监控和分析已收集的数据。

(8)ZooKeeper

ZooKeeper是一个开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件,如图2-15所示。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。

图2-15 Zookeeper架构

(9)Avro

Avro是一个数据序列化的系统。它可以提供:丰富的数据结构类型、快速可压缩的二进制数据形式、存储持久数据的文件容器、远程过程调用RPC。

(10)Mahout

Mahout是Apache Software Foundation(ASF)旗下的一个开源项目,提供一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建智能应用程序。Mahout包含许多实现,包括聚类、分类、推荐过滤、频繁子项挖掘。此外,通过使用Apache Hadoop库,可以有效地将Mahout扩展到云中。