2.1 Hadoop框架
近年来,Hadoop已经逐渐成为大数据分析领域最受欢迎的解决方案,像eBay这样大型的电子商务企业,一直在使用Hadoop技术从数据中挖掘价值,例如,通过大数据提高用户的搜索体验,识别和优化精准广告投放,以及通过点击率分析以理解用户如何使用它的在线市场平台等。目前,eBay的Hadoop集群总节点数超过10000多个,存储容量超过170PB。
Hadoop框架是用Java编写的,它的核心是HDFS(Hadoop分布式文件系统)和MapReduce。HDFS为大数据提供了存储,MapReduce为大数据提供了计算。HDFS可以保存比一个机器的可用存储空间更大的文件,这是因为HDFS是一套具备可扩展能力的存储平台,能够将数据分发至成千上万个分布式节点及低成本服务器之上,并让这些硬件设备以并行方式共同处理同一任务。Hadoop框架实现了名为MapReduce的编程范式,这个范式实现了大规模的计算:应用程序被分割成许多小部分,而每个部分在集群中的节点上并行执行(每个节点处理自己的数据)。MapReduce和分布式文件系统的设计,使得应用程序能够在成千上万的独立计算的电脑上运行并操纵PB级的数据。
Hadoop框架包括Hadoop内核、MapReduce、HDFS和Hadoop YARN等。Hadoop也是一个生态系统,在这里面有很多的组件。除了HDFS和MapReduce,有NoSQL数据库的HBase,有数据仓库工具Hive,有Pig工作流语言,有机器学习算法库Mahout,在分布式系统中扮演重要角色的Zookeeper,有内存计算框架的Spark,有数据采集的Flume和Kafka。总之,用户可以在Hadoop平台上开发和部署任何大数据应用程序。
2.1.1 HDFS(分布式文件系统)
HDFS是Hadoop Distribute File System(Hadoop分布式文件系统)的简称,是Hadoop的一个分布式文件系统。HDFS是一个可运行在廉价机器上的可容错分布式文件系统。它既有分布式文件系统的共同点,又有自己的一些明显的特征。在海量数据的处理中,我们经常碰到一些大文件(几百GB甚至TB级别)。在常规的系统上,这些大文件的读和写需要花费大量的时间。HDFS优化了大文件的流式读取方式,它把一个大文件分割成一个或者多个数据块(默认的大小为64MB),分发到集群的节点上,从而实现了高吞吐量的数据访问,这个集群拥有数百个节点,并支持千万级别的文件。因此,HDFS非常适合大规模数据集上的应用。
HDFS设计者认为硬件故障是经常发生的,所以采用了块复制的概念,让数据在集群的节点间进行复制(HDFS有一个复制因子参数,默认为3),从而实现了一个高度容错性的系统。当硬件出现故障(如:硬盘坏了)的时候,复制的数据就可以保证数据的高可用性。正是因为这个容错的特点,HDFS适合部署在廉价的机器上。当然,一块数据和它的备份不能放在同一个机器上,否则这台机器挂了,备份也同样没办法找到。HDFS使用一种机架位感知的办法,先把一份拷贝放入同机架上的机器,然后再拷贝一份到其他服务器,这台服务器也许是位于不同数据中心的,这样,如果某个数据点坏了,就从另一个机架上调用。除了机架位感知的办法,现在还有基于erasure code(一种编码存储技术)的方法。这种方法本来是用于通信容错领域的办法,可以节约空间又达到容错的目的,感兴趣的读者可以去查询相关材料。
HDFS是一个主从结构。如图2-1所示,一个HDFS集群是由一个名字节点(NameNode)和多个数据节点(DataNode)组成,它们通常配置在不同的机器上。HDFS将一个文件分割成一个或多个块,这些块被存储在一组数据节点中。名字节点用来操作文件命名空间的文件或目录操作,如:打开、关闭、重命名等等,它同时确定块与数据节点的映射。数据节点负责来自文件系统客户的读写请求。数据节点同时还要执行块的创建、删除,以及来自名字节点的块复制指令。
图2-1 HDFS架构
一个名字节点保存着集群上所有文件的目录树,以及每个文件数据块的位置信息。它是一个管理文件命名空间和客户端访问文件的主服务器,但是它不真正存储文件数据本身。数据节点通常是一个节点或一个机器,它用来真正地存放文件数据(和复制数据),管理着从NameNode分配过来的数据块,并管理对应节点的数据存储。HDFS对外开放文件命名空间并允许用户数据以文件形式存储。图2-1所示显示了一个HDFS的架构。
●客户端应用:每当需要定位一个文件或添加/复制/移动/删除一个文件时,与名字节点交互,获取文件位置信息(返回相关的数据节点信息);与数据节点交互,读取和写入数据。
●名字节点(NameNode):HDFS文件系统的核心节点,保存着集群中所有数据块位置的一个目录。它管理HDFS的名称空间和数据块映射信息,配置副本策略,处理客户端请求。
●数据节点(DataNode):存储实际的数据,汇报存储信息给NameNode。启动后,DataNode连接到NameNode,响应NameNode的文件操作请求。一旦NameNode提供了文件数据的位置信息,客户端应用可以直接与DataNode联系。DataNode并不能感知集群中其他DataNode的存在。对于MapReduce而言,我们建议把TaskTracker实例与DataNode部署在同一个服务器上,从而保证TaskTracker能够就近访问数据。DataNode之间可以直接通信,数据复制就是在DataNode之间完成的。
名字节点和数据节点都是运行在普通的机器之上的软件,一般都用Linux操作系统。因为HDFS是用Java编写的,任何支持Java的机器都可以运行名字节点或数据节点。我们很容易将HDFS部署到大范围的机器上。典型的部署是由一个专门的机器来运行名字节点软件,集群中的其他机器每台运行一个数据节点实例。体系结构不排斥在一个机器上运行多个数据节点的实例,但是实际的部署中不会有这种情况。
集群中只有一个名字节点极大地简单化了系统的体系结构。名字节点是仲裁者和所有HDFS元数据的仓库,用户的实际数据不经过名字节点。在集群中,我们一般还会配置Secondary NameNode。这个Secondary NameNode下载NameNode的image文件和editlogs,并对他们做本地归并,最后再将归并完的image文件发回给NameNode。Secondary NameNode并不是NameNode的热备份,在NameNode出故障时并不能工作。
2.1.2 MapReduce(分布式计算框架)
MapReduce是一种编程模型(也称为计算模型),用以大数据量地批处理计算。如图2-2所示,MapReduce的思想是将批量处理的任务主要分成两个阶段(Map和Reduce阶段),所谓的Map阶段就是把数据生成“键-值”对,按键排序。中间有一步叫shuffle,把同样的key运输到同一个reducer上面去。在reducer上,因为都是同一个key,就直接可以做聚合(算出总和),最后把结果输出到HDFS上。对于应用开发者来说,你需要做的就是编写Map和Reduce函数,像中间的排序、shuffle网络传输、容错处理等,框架已经帮你做好了。
图2-2 MapReduce处理示例
MapReduce的思想和人口普查的做法类似。人口普查委员会给每个城市分配人口普查工作人员(Map任务),所有人员并行地统计当地的人口数据,最后各个人员的统计数据归约(reduce任务)到总的人口普查数字。图2-2的例子是计算各个单词出现的次数。MapReduce通常将输入的数据集分割为一些独立的数据块(splitting步骤),然后由一些Map任务(task)在服务器集群上以完全并行的方式进行处理,这些Map任务的计算结果最后通过Reduce任务合并在一起来计算最终的结果。具体来说,Map对数据进行指定的操作,生成“键-值”对形式的中间结果(Mapping步骤,“Deer,1”就是一个键值对,这个中间结果一般存放在文件系统上)。MapReduce框架对中间结果按照键值排序(Shuffling步骤),Reduce则对中间结果中相同“键”的所有“值”进行规约(Reducing步骤),以得到最终结果。最终结果一般也存放在文件系统上(如Final result步骤)。MapReduce框架负责任务的调度和监控,并重新执行这些失败的任务。MapReduce非常适合在大量计算机组成的分布式并行环境里进行数据处理。
图2-3显示了一个MapReduce的使用场景,图中各主要要素说明如下。
图2-3 MapReduce进程示例
●JobTracker:这是主节点,只有一个,它管理所有作业,作业/任务的监控、错误处理等。它将任务分解成一系列的子任务(Map任务、Reduce任务、Shuffle操作),并分派给TaskTracker。
●TaskTracker:这是从节点,可以有多个,它们接收来自JobTracker的Map Task、Reduce Task和Shuffle operations,并执行之。它们与JobTracker交互,汇报任务状态。
●Map Task:解析每条数据记录,传递给用户编写的map(),并执行,最后将输出结果写入本地磁盘(如果为map-only作业,直接写入HDFS)。
●Reduce Task:从Map Task的执行结果中,对数据进行排序,将数据按照分组传递给用户编写的Reduce函数执行。
我们以一个实际的案例来说明MapReduce处理流程。如果我们要统计一下过去60年人民日报出现最多的几个词(未必是一个,可能有好几个词出现的次数一样多),那怎么使用MapReduce来处理呢?我们首先想到的是,可以写一个程序,把所有人民日报按顺序遍历一遍,统计每一个遇到的词的出现次数,最后就可以知道哪几个词最热门了。但是因为人民日报的文章数量很大,这个方法肯定耗时不少。既然MapReduce的本质就是把作业交给多个计算机去完成。那么,我们可以使用上述的程序,部署到N台机器上去,然后把60年的报纸分成N份,一台机器跑一个作业,然后把N个运行结果进行整合。MapReduce本质上就是如此,但是如何拆分60年的报纸文件,如何部署程序到N个机器上,如何整合结果,这都是MapReduce框架定义好的。我们只要定义好这个map和reduce任务,其他都交给MapReduce。
下面我们使用MapReduce伪代码来说明如何实现Map和Reduce两个函数。Map函数和Reduce函数是需要我们自己实现的,这两个函数定义了任务本身。MapReduce计算框架中的输入和输出的基本数据结构是“键-值”对。需要提醒读者的是,我们现在很少直接使用MapReduce框架来编写程序了,而是使用基于MapReduce框架的工具来编写,或者直接用Spark等工具来编写。下面的Map函数和Reduce函数的伪代码用来帮助读者理解整个MapReduce框架的思想。
1.Map函数
接受一个“键-值”对,产生一组中间“键-值”对。MapReduce框架有sort(排序)和shuffle(发送)操作,sort会按照键来对map函数所产生的键-值对进行排序(如图2-2所示的排序结果),然后shuffle将所有具有相同键的“键-值”对发送给同一个reduce函数。
ClassMapper method map(String input_key, String input_value): // input_key: 报纸文件名称 // input_value: 报纸文件内容 for each word w in input_value: EmitIntermediate(w, "1"); //记录每个词的出现(次数为1)
在上面的代码中,map函数接受的键是文件名(假定文件名是日期,如:20160601则表明是2016年6月1日的人民日报电子版文件),值是文件的内容,map函数逐个遍历词语,每遇到一个词w,就产生一个中间“键-值”对<w, "1">,这表示这个w词出现了一次。
2.Reduce函数
接受一个键(一个词),以及相关的一组值(这一组值是所有Map对于这个词计算出来的频数的一个集合),整个输入数据也是一个“键-值”对。将这组值进行合并产生一组规模更小的值(通常只有一个或零个值)。在我们这个例子中,是将值集合中的频数进行求和,然后记录每个词和这个词出现的总频数。
ClassReducer method reduce(String output_key,Iterator intermediate_values): // output_key: 一个词 // intermediate_values: 该词对应的所有出现次数的列表 int result = 0; for each v in intermediate_values: result += ParseInt(v); //次数累加 Emit(AsString(result));//记录最终累加结果
MapReduce将键相同(都是词w)的“键-值”对传给reduce函数,这样reduce函数接受的键就是单词w,值是字符串“1”的列表(键为w的键-值对的个数),然后将这些“1”累加就得到单词w的出现次数。最后存储在HDFS上。
MapReduce支持C/C++、Java、Ruby、Perl和Python编程语言。开发人员可以使用MapReduce库来创建任务。至于节点之间的通信和协调,输入数据集的切割,在不同机器之间的程序执行调度,处理错误等,这些都由框架完成,开发人员无须处理。Map和Reduce函数会自动在多个服务器节点上自动并行执行。即使开发人员完全没有并行和分布式系统的经验和知识,也能轻松地利用好大型分布式系统的资源。MapReduce革新了海量数据计算的方式,为运行在成百上千台机器上的并行程序提供了简单的编程模型。MapReduce几乎可以做到线性扩展:随着数据量的增加,可以通过增加更多的计算机来保持作业时间不变。MapReduce容错性强,它将工作拆分成多个小任务后,能很好地处理任务失败。
2.1.3 YARN(集群资源管理器)
从Hadoop 2开始,MapReduce被一个改进的版本所替代,这个新版本叫做MapReduce 2.0(MRv2)或YARN(Yet Another Resouce Negotiator,另一种资源协调者)。YARN是一种新的Hadoop资源管理器,也是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在利用率、资源统一管理和数据共享等方面带来了巨大好处。我们先来回顾一下老版本的MapReduce的流程和设计思路。从图2-3可以看出:
●首先客户端应用提交了一个job,job的信息会发送到Job Tracker中,Job Tracker是MapReduce框架的中心,它与集群中的机器定时通信(心跳),确定哪些程序在哪些机器上执行,管理所有job失败、重启等操作。
●TaskTracker在MapReduce集群中每台机器都有,主要是监视所在机器的资源情况。
●TaskTracker同时监视当前机器上的tasks运行状况。TaskTracker把这些信息通过心跳发送给JobTracker,JobTracker会搜集这些信息,为新提交的job确定运行在哪些机器上。
MapReduce架构简单明了,在最初推出的几年,得到了众多的成功案例,获得业界广泛的支持和肯定。但随着分布式系统集群的规模及其工作负荷的增长,原框架固有的问题逐渐浮出水面,主要的问题集中如下:
●JobTracker是MapReduce的集中处理点,存在单点故障。
●JobTracker承担了太多的任务,造成了过多的资源消耗,当job非常多的时候,会造成很大的内存开销,也增加了JobTracker崩溃的风险。业界的共识是老版本的MapReduce的上限只能支持4000个节点主机。
●在TaskTracker端,只以map/reduce task的数目作为资源的表示过于简单,没有考虑到CPU和内存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现Java的OOM。
●在TaskTracker端,把资源强制划分为map task slot和reduce task slot。当系统中只有map task或者只有reduce task的时候,这会造成资源的浪费,也就是前面提到的集群资源利用的问题。
YARN最初是为了修复MapReduce实现里的明显不足,并对可伸缩性(支持一万个节点和二十万个内核的集群)、可靠性和集群利用率进行了提升。YARN把Job Tracker的两个主要功能(资源管理和作业调度/监控)分成了两个独立的服务程序——全局的资源管理(Resource Manager,简称为RM)和针对每个应用的App Master(AM),这里说的应用要么是传统意义上的MapReduce任务,要么是任务的有向无环图(DAG)。Resource Manager和每一台机器的节点管理服务器(Node Manager)能够管理用户在哪台机器上的进程,并能对计算进行组织。其架构图如图2-4所示。
图2-4 YARN架构
Resource Manager支持分层级的应用队列,这些队列享有集群一定比例的资源。它是一个调度器,可以基于应用程序对资源的需求进行调度。每一个应用程序需要不同类型的资源,因此就需要不同的容器(container)。资源包括:内存、CPU、磁盘、网络等等。可以看出,这同老版本MapReduce的固定类型的资源使用模型有显著区别。资源管理器提供一个调度策略的插件,它负责将集群资源分配给多个队列和应用程序。
图中Node Manager是每一台机器的代理,是执行应用程序的容器,它监控应用程序的资源使用情况(CPU、内存、硬盘、网络)并且向资源管理器汇报。每一个应用的Application Master是一个框架库,它结合从Resource Manager获得的资源和Node Manager协同工作来运行和监控任务。每一个应用的Application Master向资源管理器索要适当的资源容器,运行任务,跟踪应用程序的状态和监控它们的进程,处理任务的失败。
让我们对新旧MapReduce框架做一下比较。首先客户端应用不变,其API大部分保持兼容,这也是为了对开发者透明化,使其不必对原有应用代码做大的修改,但是原框架中核心的JobTracker和TaskTracker不见了,取而代之的是Resource Manager、Application Master与NodeManager三个部分。Resource Manager是一个中心的服务,它是调度和启动每一个Job所属的Application Master,另外监控Application Master的存在情况。在老版本的Job里面所在的task监控和重启在YARN中都不见了,这就是在YARN中出现Application Master的原因。Resource Manager负责作业与资源的调度,接收Job Submitter提交的作业,按照作业的上下文(Context)信息,以及从Node Manager收集来的状态信息,启动调度过程,分配一个Container。Node Manager功能比较专一,就是负责Container状态的维护,并向Resource Manager保持心跳。Application Master负责一个Job生命周期内的所有工作,类似老框架中的JobTracker。但需要注意每一个Job(不是每一种)都有一个Application Master,它可以运行在Resource Manager以外的机器上。
Yarn框架相对于老的MapReduce框架有什么优势呢?首先,这个设计大大减小了JobTracker(也就是YARN的Resource Manager)的资源消耗,并且让监测每一个Job子任务(tasks)状态的程序分布式了。对于资源的表示以内存为单位,比之前以剩余slot数目为单位更合理。老的框架中,JobTracker一个很大的负担就是监控job下tasks的运行状况,现在,这个部分由Application Master完成,而Resource Manager中有一个模块叫做ApplicationsMasters(注意不是ApplicationMaster),它用来监测ApplicationMaster的运行状况,如果出问题,会将其在其他机器上重启。Container是Yarn为了将来作资源隔离而提出的一个框架。这一点应该是借鉴了Mesos的工作机制,虽然Container目前是一个框架,仅仅提供Java虚拟机内存的隔离,但是未来可能会支持更多的资源调度和控制。既然资源表示成内存量,那就没有了之前的map slot/reduce slot分开所造成集群资源闲置的问题。
总之,YARN从某种意义上来说应该算是一个云操作系统,它负责集群的资源管理。在操作系统之上可以开发各类的应用程序。这些应用可以同时利用Hadoop集群的计算能力和丰富的数据存储模型,共享同一个Hadoop集群和驻留在集群上的数据。此外,这些新的框架还可以利用YARN的资源管理器,提供新的应用管理器实现。本书后面介绍的Spark框架就支持YARN。
2.1.4 Zookeeper(分布式协作服务)
Zookeeper是一个集中式服务,主要负责分布式任务调度,它用来完成配置管理、名字服务、提供分布式锁以及集群管理等工作。具体说明如下。
1.配置管理
应用程序中经常有一些配置,比如数据库连接等。一般我们都是使用配置文件的方式,在代码中引入这些配置文件。这种方式适合只有一台服务器的时候。当我们有很多服务器时,就需要寻找一种集中管理配置的方法,而不是在各个服务器上存放配置文件。我们在这个集中的地方修改了配置,所有需要配置的服务都能读取配置。一般我们用一个集群来提供这个配置服务以提升可靠性。
Zookeeper保证了配置在集群中的一致性,它使用Zab这种一致性协议来提供一致性。现在有很多开源项目使用Zookeeper来维护配置,比如在HBase中,客户端就是连接一个Zookeeper,获得必要的HBase集群的配置信息,然后才可以进一步操作。在开源的消息队列Kafka中,也使用Zookeeper来维护broker的信息。
2.名字服务
DNS把域名(如:www.da-shuju.com)对应到IP地址(59.175.137.94),从而为我们提供了名字服务。在应用系统中我们有时也会需要这类名字服务,特别是在服务特别多的时候。我们只需要访问一个共同的地方,它提供统一的入口。
3.分布式锁
Zookeeper是一个分布式协调服务。我们利用Zookeeper来协调多个分布式进程之间的活动。在一个分布式环境中,为了提高可靠性,集群中的每台服务器上都部署着同样的服务。我们使用分布式锁,在某个时刻只让一个服务去干活,当这个服务出问题时就将锁释放,并立即切换到另外的服务上。比如HBase的Master就是采用这种机制。在Zookeeper中是通过选举leader完成的分布式锁。
4.集群管理
在分布式的集群中,经常会由于各种原因,比如硬件故障、软件故障、网络问题,有新的节点加入进来,也有老的节点退出集群。这个时候,集群中其他机器需要感知到这种变化,然后根据这种变化做出对应的决策。比如:一个分布式的SOA架构中,服务是一个集群提供的,当消费者访问某个服务时,就需要确定哪些节点可以提供该服务。Kafka的队列就采用了Zookeeper作为消费者的上下线管理。
总之,Zookeeper就是一种可靠的、可扩展的、分布式的、可配置的协调机制,用来统一分布式系统的状态。
2.1.5 Ambari(管理工具)
Apache Ambari是一种基于Web的Hadoop管理工具,可以快捷地监控、部署、管理Hadoop集群。Ambari目前已支持大多数Hadoop组件,包括HDFS、MapReduce、Hive、Pig、HBase、Zookeper、Sqoop和Hcatalog等。Ambari可以帮助Hadoop系统管理员来完成以下工作:
●通过一步一步的安装向导简化了集群的安装和配置。
●集中管理(包括:启动、停止和重新配置)集群上的Hadoop服务。
●预先配置好关键的运维指标,可以直接查看Hadoop Core(HDFS和MapReduce)及相关项目(如HBase、Hive和HCatalog)是否健康。
●支持作业与任务执行的可视化与分析,能够更好地查看依赖和性能。
●通过一个完整的RESTful API把监控和管理功能嵌入到自己的应用系统中。
●用户界面非常直观,用户可以轻松有效地查看信息并控制集群。
Ambari使用Ganglia收集度量指标,用Nagios支持系统报警,当需要引起管理员的关注时(比如,节点停机或磁盘剩余空间不足等问题),系统将向其发送邮件。此外,Ambari能够安装安全的(基于Kerberos)Hadoop集群,以此实现了对Hadoop安全的支持,提供了基于角色的用户认证、授权和审计功能,并为用户管理集成了LDAP和Active Directory。