1.4 大数据处理框架的四层结构
一个大数据应用可以表示为<;输入数据,用户代码,配置参数>;。应用的输入数据一般以分块(如以128MB为一块)形式预先存储在分布式文件系统(如HDFS[18])之上。用户在向大数据处理框架提交应用之前,需要指定数据存储位置,撰写数据处理代码,并设定配置参数。之后,用户将应用提交给大数据处理框架运行。
大数据处理框架大体可以分为四层结构:用户层、分布式数据并行处理层、资源管理与任务调度层、物理执行层。以Apache Spark框架为例,其四层结构如图1.2所示。在用户层中,用户需要准备数据、开发用户代码、配置参数。之后,分布式数据并行处理层根据用户代码和配置参数,将用户代码转化成逻辑处理流程(数据单元及数据依赖关系),然后将逻辑处理流程转化为物理执行计划(执行阶段及执行任务)。资源管理与任务调度层根据用户提供的资源需求来分配资源容器,并将任务(task)调度到合适的资源容器上运行。物理执行层实际运行具体的数据处理任务。下面具体介绍每个层次的详细信息,以及工业界和学术界进行的一些相关工作。
图1.2 大数据处理框架的四层结构
1.4.1 用户层
用户层方便用户开发大数据应用。如前所述,我们将一个大数据应用表示为<;输入数据,用户代码,配置参数>;。下面介绍用户在开发应用时需要准备的输入数据、用户代码和配置参数。
1.输入数据
对于批式大数据处理框架,如Hadoop、Spark,用户在提交作业(job)之前,需要提前准备好输入数据。输入数据一般以分块(如以128MB为一块)的形式预先存储,可以存放在分布式文件系统(如Hadoop的分布式文件系统HDFS)和分布式Key-Value数据库(如HBase[19])上,也可以存放到关系数据库中。输入数据在应用提交后会由框架进行自动分块,每个分块一般对应一个具体执行任务(task)。
对于流式大数据处理框架,如Spark Streaming[20]和Apache Flink[21],输入数据可以来自网络流(socket)、消息队列(Kafka)等。数据以微批(多条数据形成一个微批,称为mini-batch)或者连续(一条接一条,称为continuous)的形式进入流式大数据处理框架。
对于大数据应用,数据的高效读取常常成为影响系统整体性能的重要因素。为了提高应用读取数据的性能,学术界研究了如何通过降低磁盘I/O来提高性能。例如,PACMan[22]根据一定策略提前将task所需的部分数据缓存到内存中,以提高task的执行性能。为了加速不同的大数据应用(如Hadoop、Spark等)之间的数据传递和共享,Tachyon[23](现在更名为Alluxio[24])构造了一个基于内存的分布式数据存储系统,用户可以将不同应用产生的中间数据缓存到Alluxio中,而不是直接缓存到框架中,这样可以加速中间数据的写入和读取,同时也可以降低框架的内存消耗。
2.用户代码
用户代码可以是用户手写的MapReduce代码,或者是基于其他大数据处理框架的具体应用处理流程的代码。图1.3展示了在Hadoop MapReduce上实现WordCount的用户代码,其用于计算字符出现的次数。在Hadoop MapReduce上用户需要自定义map()和reduce()函数。除了map()和reduce()函数,用户为了优化应用性能还定义了一个“迷你”的reduce(),叫作combine()。combine()可以在reduce()执行之前对中间数据进行聚合,这样可以减少reduce()从各个节点获取的输入数据量,进而减少网络I/O开销和reduce()的压力。combine()和reduce()的代码实现一般是相同的。Hadoop MapReduce提供的map()和reduce()函数的处理逻辑比较固定单一,难以支持复杂数据操作,如常见的排序操作sort()、数据库表的关联操作join()等。为此,Dryad和Spark提供了更加通用的数据操作符,如flatMap()等。图1.4展示了在Spark上实现WordCount的用户代码,对于同样的应用处理逻辑,基于Spark的用户代码比基于Hadoop MapReduce的用户代码要更加简洁。
图1.3 在Hadoop MapReduce上实现WordCount的用户代码
图1.4 在Spark上实现WordCount的用户代码
在实际系统中,用户撰写用户代码后,大数据处理框架会生成一个Driver程序,将用户代码提交给集群运行。例如,在Hadoop MapReduce中,Driver程序负责设定输入/输出数据类型,并向Hadoop MapReduce框架提交作业;在Spark中,Driver程序不仅可以产生数据、广播数据给各个task,而且可以收集task的运行结果,最后在Driver程序的内存中计算出最终结果。图1.5展示了在Spark平台上Driver程序的运行模式。
图1.5 在Spark平台上Driver程序的运行模式
除了直接依赖底层操作手动撰写用户代码,用户还可以利用高层语言或者高层库来间接产生用户代码。例如,在图1.6中用户可以使用类似SQL的Apache Pig脚本自动转化生成Hadoop MapReduce代码。通过这种方式生成的代码是二进制的,map()和reduce()等函数代码不可见。一些高层库还提供了更简单的方式生成用户代码,如使用Spark之上的机器学习库MLlib时,用户只需要选择算法和设置算法参数,MLlib即可自动生成可执行的Spark作业了。
图1.6 使用类似SQL的Apache Pig脚本自动转化生成Hadoop MapReduce代码
除了Apache Pig和MLlib,工业界和学术界也提出了很多更简单、更通用的高层语言和高层库,使用户不用手写较为烦琐的map()和reduce()代码。Google提出了FlumeJava[25],可以将多个MapReduce 作业以流水线(pipeline)的形式串联起来,并提供了基本的数据操作符,如group()、join(),使常见的编程任务变得简单。Cascading[26]是用Java编写的,它是构建在Hadoop之上的一套数据操作函数库。与FlumeJava类似,Cascading同样为用户提供了基本的数据操作符,可以方便用户构建出较为复杂的数据流程。Google设计的Sawzall[27]是一种用于数据查询的脚本语言,偏向统计分析。Sawzall脚本可以自动转化为MapReduce作业执行,使得分析人员不用直接写MapReduce程序就可以进行大数据分析。Google还设计了Tenzing[28],该模块构建在MapReduce框架之上,支持SQL查询语言,并实现高效、低延迟的数据查询服务。微软研究院也设计了自己的用户层语言DryadLINQ[29]和SCOPE[30]。DryadLINQ将针对数据对象操作的LINQ程序转化成Dryad任务,再利用Dryad框架来并行处理数据。SCOPE与Sawzall在一个层次上,可以将SQL脚本转化成Dryad DAG任务,同样利用Dryad框架来并行处理数据。SCOPE和Dryad是使用C#/C++实现的。
在用户代码的优化方面,PeriSCOPE[31]根据job piepeline的拓扑结构对用户代码采用类似编译的优化措施,自动优化运行在SCOPE上的job性能。
3.配置参数
一个大数据应用可以有很多配置参数,如Hadoop支持200多个配置参数。这些配置参数可以分为两大类:一类是与资源相关的配置参数。例如,buffer size定义框架缓冲区的大小,影响map/reduce任务的内存用量。在Hadoop中,map/reduce任务实际启动一个JVM来运行,因此用户还要设置JVM的大小,也就是heap size。在Spark中,map/reduce任务在资源容器(Executor JVM)中以线程的方式执行,用户需要估算应用的资源需求量,并设置应用需要的资源容器个数、CPU个数和内存大小。
另一类是与数据流相关的配置参数。例如,Hadoop和Spark中都可以设置partition()函数、partition个数和数据分块大小。partition()函数定义如何划分map()的输出数据。partition个数定义产生多少个数据分块,也就是有多少个reduce任务会被运行。数据分块大小定义map任务的输入数据大小。
由于Hadoop/Spark框架本身没有提供自动优化配置参数的功能,所以工业界和学术界研究了如何通过寻找最优配置参数来对应用进行性能调优。StarFish[32]研究了如何选择性能最优的Hadoop应用配置参数,其核心是一个Just-In-Time的优化器,该优化器可以对Hadoop应用的历史运行信息进行分析,并根据分析结果来预测应用在不同配置参数下的执行时间,以选择最优参数。Verma等[33,34]讨论了在给定应用完成时限的情况下,如何为Hadoop应用分配最佳的资源(map/reduce slot)来保证应用能够在给定时限内完成。DynMR[35]通过调整任务启动时间、启动顺序、任务个数来减少任务等待时间和由于过早启动而引起的任务之间的资源竞争。MROnline[36]根据任务执行状态,使用爬山法寻找最优的缓冲区大小和任务内存大小,以减少应用执行时间。Xu等[37]研究了如何离线估计MapReduce应用内存用量,采用的方法是先用小样本数据运行应用,然后根据应用运行信息来估算应用在大数据上的实际内存消耗。SkewTune[38]可以根据用户自定义的代价函数来优化数据划分算法,在保持数据输入顺序的同时,减少数据倾斜问题。
1.4.2 分布式数据并行处理层
分布式数据并行处理层首先将用户提交的应用转化为较小的计算任务,然后通过调用底层的资源管理与任务调度层实现并行执行。
在Hadoop MapReduce中,这个转化过程是直接的。因为MapReduce具有固定的执行流程(map—Shuffle—reduce),可以直接将包含map/reduce函数的作业划分为map和reduce两个阶段。map阶段包含多个可以并行执行的map任务,reduce阶段包含多个可以并行执行的reduce任务。map任务负责将输入的分块数据进行map()处理,并将其输出结果写入缓冲区,然后对缓冲区中的数据进行分区、排序、聚合等操作,最后将数据输出到磁盘上的不同分区中。reduce任务首先将map任务输出的对应分区数据通过网络传输拷贝到本地内存中,内存空间不够时,会将内存数据排序后写入磁盘,然后经过归并、排序等阶段产生reduce()的输入数据。reduce()处理完输入数据后,将输出数据写入分布式文件系统中。
与Hadoop MapReduce不同,Spark上应用的转化过程包含两层:逻辑处理流程、执行阶段与执行任务划分。如图1.7所示,Spark 首先根据用户代码中的数据操作语义和操作顺序,将代码转化为逻辑处理流程。逻辑处理流程包含多个数据单元和数据依赖,每个数据单元包含多个数据分块。然后,框架对逻辑处理流程进行划分,生成物理执行计划。该计划包含多个执行阶段(stage),每个执行阶段包含若干执行任务(task)。微软的大数据编程框架DryadLINQ也提供类似的编译过程,可以将用户编写的大数据应用程序(LINQ)编译为可分布运行的Dryad执行计划和任务。
为了将用户代码转化为逻辑处理流程,Spark和Dryad对输入/输出、中间数据进行了更具体的抽象处理,将这些数据用一个统一的数据结构表示。在Spark中,输入/输出、中间数据被表示成RDD(Resilient Distributed Datasets,弹性分布式数据集)。在RDD上可以执行多种数据操作,如简单的map(),以及复杂的cogroup()、join()等。一个RDD可以包含多个数据分区(partition)。parent RDD和child RDD之间通过数据依赖关系关联,支持一对一和多对一等数据依赖关系。数据依赖关系的类型由数据操作的类型决定。如图1.7所示,逻辑处理流程是一个有向无环图(Directed Acyclic Graph,简称DAG图),其中的节点是数据单元RDD,每个数据单元里面的圆形是指RDD的多个数据分块,正方形专指输入数据分块。箭头是在RDD上的一些数据操作(也隐含了parent RDD和child RDD之间的依赖关系)。
图1.7 Spark应用转化与执行流程
为了将逻辑处理流程转化为物理执行计划,Spark首先根据RDD之间的数据依赖关系,将整个流程划分为多个小的执行阶段(stage)。例如,图1.7中逻辑处理流程被划分为3个执行阶段。之后,在每个执行阶段形成计算任务(task),计算任务的个数一般与RDD中分区的个数一致。与MapReduce不同的是,一个Spark job可以包含很多个执行阶段,而且每个执行阶段可以包含多种计算任务,因此并不能严格地区分每个执行阶段中的任务是map任务还是reduce任务。另外,在Spark中,用户可以通过调用cache() 接口使框架缓存可被重用的中间数据。例如,当前job的输出可能会被下一个job用到,那么用户可以使用cache()对这些数据进行缓存。
1.4.3 资源管理与任务调度层
从系统架构上讲,大数据处理框架一般是主-从(Master-Worker)结构。主节点(Master节点)负责接收用户提交的应用,处理请求,管理应用运行的整个生命周期。从节点(Worker节点)负责执行具体的数据处理任务(task),并在运行过程中向主节点汇报任务的执行状态。以Hadoop MapReduce为例(见图1.8),在主节点运行的JobTracker进程首先接收用户提交的job,然后根据job的输入数据和配置等信息将job分解为具体的数据处理任务(map/reduce task),最后将task交给任务调度器调度运行。任务调度器根据各个从节点的资源总量与资源使用情况将map/reduce task分发到合适的从节点的TaskTracker中。TaskTracker进程会为每个map/reduce task启动一个进程(在Hadoop MapReduce中是JVM进程)执行task的处理步骤。每个从节点可以同时运行的task数目由该节点的CPU个数等资源状况决定。
图1.8 Hadoop MapReduce框架的部署图,其中不同job的task可以分布在不同机器上
另外,大数据处理服务器集群一般由多个用户共享。如果多个用户同时提交了job且集群资源充足,那么集群会同时运行多个job,每个job包含多个map/reduce task。在图1.8中,Worker 1节点上运行了3个map task和1个reduce task,而Worker 2节点上运行了2个map task和1个reduce task。Worker 1节点上运行的map task和Worker 2节点上运行的map task可以分别属于不同的job。
Spark支持不同的部署模式,如Standalone部署模式、YARN部署模式和Mesos部署模式等。其中Standalone部署模式与Hadoop MapReduce部署模式基本类似,唯一区别是Hadoop MapReduce部署模式为每个task启动一个JVM进程运行,而且是在task将要运行时启动JVM,而Spark是预先启动资源容器(Executor JVM),然后当需要执行task时,再在Executor JVM里启动task线程运行。
在运行大数据应用前,大数据处理框架还需要对用户提交的应用(job)及其计算任务(task)进行调度。任务调度的主要目的是通过设置不同的策略来决定应用或任务获得资源的先后顺序。典型的任务调度器包含先进先出(FIFO)调度器、公平(Fair)调度器等。先进先出(FIFO)的任务调度器如图1.9所示,其有两种类型的调度器:一类是应用调度器,决定多个应用(app)执行的先后顺序;另一类是任务调度器,决定多个任务(task)执行的先后顺序。例如,Spark中一个stage可能包含多个map task,任务调度器可以根据数据本地化等信息决定这些task应调度到的执行节点。
图1.9 先进先出(FIFO)的任务调度器
传统的资源管理与任务调度层只针对某一种类型的应用进行资源管理和任务调度,而有一些新的研究对此进行了拓展。例如,UC Berkeley提出将资源管理和任务调度模块构造为一个统一的集群资源管理系统,称为“集群操作系统”[39]。该系统可以集中调度多个不同大数据处理框架中的job。再例如,第二代Hadoop的资源管理与调度框架YARN[40]能够同时为集群中运行的多种框架(如Hadoop MapReduce,Spark)提供资源管理等服务。用户可以直接将应用提交给YARN,并且在提交应用时指定应用的资源需求,如CPU个数和内存空间大小等。UC Berkeley提出的Mesos[41]与YARN类似,可以对集群上各种应用进行资源分配与任务调度,支持MapReduce作业、Spark作业、MPI[42]作业等。尽管YARN和Mesos提供了比较成熟的资源管理策略,可以统一分配、管理和回收不同节点上的计算资源。然而,它们有一个共同的局限,即资源分配策略的执行依赖用户提供的资源需求与当前集群资源的监控信息,而不能根据应用的实际场景自动动态地调整资源分配。
1.4.4 物理执行层
大数据处理框架的物理执行层负责启动task,执行每个task的数据处理步骤。在Hadoop MapReduce中,一个应用需要经历map、Shuffle、reduce 3个数据处理阶段。而在Spark中,一个应用可以有更多的执行阶段(stage),如迭代型应用可能有几十个执行阶段,每个执行阶段也包含多个task。另外,这些执行阶段可以形成复杂的DAG图结构。在物理执行时首先执行上游stage中的task,完成后执行下游stage中的task。
在Hadoop MapReduce中,每个task对应一个进程,也就是说每个task以JVM(Java虚拟机)的方式来运行,所以在Hadoop MapReduce中task的内存用量指的是JVM的堆内存用量。在Spark中,每个task对应JVM中的一个线程,而一个JVM可能同时运行了多个task,因此JVM的内存空间由task共享。在应用未运行前,我们难以预知task的内存消耗和执行时间,也难以预知JVM中的堆内存用量。
从应用特点来分析,我们可以将task执行过程中主要消耗内存的数据分为以下3类。
(1)框架执行时的中间数据。例如,map()输出到缓冲区的数据和reduce task在Shuffle阶段暂存到内存中的数据。
(2)框架缓存数据。例如,在Spark中,用户调用cache()接口缓存到内存中的数据。
(3)用户代码产生的中间计算结果。例如,用户代码调用map()、reduce()、combine(),在处理输入数据时会在内存中产生中间计算结果。
很多大数据处理框架在设计时就考虑了内存的使用问题,并进行了相应的优化设计。例如,Spark框架是基于内存计算的,它将大量的输入数据和中间数据都缓存到内存中,这种设计能够有效地提高交互型job和迭代型job的执行效率。
由于大数据应用的内存消耗量很大,所以当前许多研究关注如何改进大数据处理框架的内存管理机制,以减少应用内存消耗。例如,UCSD提出了ThemisMR [43],重新设计了MapReduce的数据流及内存管理方案,有效地将中间数据磁盘读写次数降低为两次,从而提高了job的执行性能。Tachyon构造了一个基于内存的分布式数据存储系统,主要用于在不同Hadoop/Spark应用之间共享数据。用户可以将不同应用产生的中间数据缓存到Tachyon中而非直接缓存到框架中,以降低框架的内存消耗。FAÇADE[44]提供了用于降低用户代码内存消耗的代码编译和执行环境。FAÇADE的设计目的是将数据存储和数据操作分开,方法是将数据存放到JVM的堆外内存中,将对堆内对象的数据操作转化为对FAÇADE的函数调用。对于Java对象本身产生的overhead(也就是Java对象自身所需的header和reference),Bu等[45]提出了一些优化方法,如将大量数据对象(record object)合并为少量的大的数据对象。Lu等[46]提出了基于对象生命周期的内存管理机制,可以根据数据对象类型和生命周期,将对象分配到不同队列进行分配和回收。Xu等[47]针对Hadoop/Spark等大数据框架经常出现的垃圾回收时间长、频繁等问题,通过实验分析主流Java垃圾回收算法在大数据环境下存在的性能缺陷,提出了垃圾回收算法的3种改进方法。Yak[48]提出了一种混合GC算法,将堆内存划分为控制流区域和数据流区域,前者使用传统GC算法回收控制流代码的内存对象,后者使用基于时域区域(epoch-based region)的内存管理,并根据数据对象生命周期来回收内存。Spark社区采用堆外内存管理机制和基于堆外内存的Shuffle机制,提出了钨丝计划[49]。
另外,如何预测大数据应用的执行时间也被一些研究人员关注。如果能够预测出job的执行时间可以为任务调度器提供决策依据,则方便用户了解job的执行进度。华盛顿大学的研究人员提出了KAMD [50]和ParaTimer [51],可以根据job执行的历史信息并结合正在运行的job处理的数据量,使用启发式方法来估算job剩余的执行时间。UIUC的研究人员提出了ARIA [33],细粒度地分析了单个MapReduce job的执行阶段,并提出了基于上下界的时间估算公式,可以通过job的历史信息或调试信息来估算执行时间。华盛顿大学的研究人员后来又提出了PerfXplain [52],通过对比两个包含同样处理逻辑的job的性能指标,来解释两个job执行效率不同的原因。