大数据技术入门
上QQ阅读APP看书,第一时间看更新

2.2 Spark(内存计算框架)

随着大数据的发展,人们对大数据的处理要求也越来越高,原有的批处理框架MapReduce适合离线计算,却无法满足实时性要求较高的业务,如实时推荐、用户行为分析等。因此,Hadoop生态系统又发展出以Spark为代表的新计算框架。相比MapReduce,Spark速度快,开发简单,并且能够同时兼顾批处理和实时数据分析。

Apache Spark是加州大学伯克利分校的AMPLabs开发的开源分布式轻量级通用计算框架,并于2014年2月成为Apache的顶级项目。由于Spark基于内存设计,使得它拥有比Hadoop更高的性能,并且对多语言(Scala、Java、Python)提供支持。Spark有点类似Hadoop MapReduce框架。Spark拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS(MapReduce的中间结果要放在文件系统上),因此,在性能上,Spark能比MapReduce框架快100倍左右(如图2-5所示),排序100TB的数据只需要20分钟左右。正是因为Spark主要是在内存中执行,所以Spark对内存的要求非常高,一个节点通常需要配置24GB的内存。在业界,我们有时把MapReduce称为批处理计算框架,把Spark称为实时计算框架、内存计算框架或流式计算框架。

图2-5 性能比较(数据来源:http://spark.apache.org/)

Hadoop使用数据复制来实现容错性(I/O高),而Spark使用RDD(Resilient Distributed Datasets,弹性分布式数据集)数据存储模型来实现数据的容错性。RDD是只读的、分区记录的集合。如果一个RDD的一个分区丢失,RDD含有如何重建这个分区的相关信息。这就避免了使用数据复制来保证容错性的要求,从而减少了对磁盘的访问。通过RDD,后续步骤如果需要相同数据集时就不必重新计算或从磁盘加载,这个特性使得Spark非常适合流水线式的处理。

虽然Spark可以独立于Hadoop来运行,但是Spark还是需要一个集群管理器和一个分布式存储系统。对于集群管理,Spark支持Hadoop YARN、Apache Mesos和Spark原生集群。对于分布式存储,Spark可以使用HDFS、Casandra、OpenStack Swift和Amazon S3。Spark支持Java、Python和Scala(Scala是Spark最推荐的编程语言,Spark和Scala能够紧密集成,Scala程序可以在Spark控制台上执行)。应该说,Spark紧密集成Hadoop生态系统中的上述工具。Spark可以与Hadoop上的常用数据格式(如:Avro和Parquet)进行交互,能读写HBase等NoSQL数据库,它的流处理组件Spark Streaming能连续从Flume和Kafka之类的系统上读取数据,它的SQL库Spark SQL能和Hive Metastore交互。

Spark可用来构建大型的、低延迟的数据分析应用程序。如图2-6所示,Spark包含了如下的库:Spark SQL,Spark Streaming,MLlib(用于机器学习)和GraphX。其中Spark SQL和Spark Streaming最受欢迎,大概60%左右的用户在使用这两个中的一个。而且Spark还能替代MapReduce成为Hive的底层执行引擎。

图2-6 Spark组件

Spark的内存缓存使它适合于迭代计算。机器学习算法需要多次遍历训练集,可以将训练集缓存在内存里。在对数据集进行探索时,数据科学家可以在运行查询的时候将数据集放在内存,这样就节省了访问磁盘的开销。

虽然Spark目前被广泛认为是下一代Hadoop,但是Spark本身的复杂性也困扰着开发人员。Spark的批处理能力仍然比不过MapReduce,Spark SQL和Hive的SQL功能相比还有一定的差距,Spark的统计功能与R语言还没有可比性。

2.2.1 Scala

Spark框架是用Scala开发的,并提供了Scala语言的一个子集。那么,什么是Scala呢?Scala是一种类似Java的编程语言,它的设计初衷是创造一种更好地支持组件的语言。Scala的编译器把源文件编译成Java的class文件,从而让Scala程序运行在JVM上。Scala兼容现有的Java程序,从Scala中可调用所有的Java类库。Scala能够让我们花更少的时间和更少的代码编写一样功能的Java程序。在JVM上,Scala代码多了一个运行库scala-library.jar。

Scala支持交互式运行,开发人员无须编译就能运行这个代码。比如:键入下列Scala代码,然后按Enter键:

    scala> println("Hello, Scala!");

这将产生以下结果:

    Hello, Scala!

Scala和Java间的最大语法的区别在于“;”(行结束符)是可选的。其他都非常类似。下面我们来编写一个简单的Scala代码,用于打印简单的一句话:“Hello, World!”。

    object HelloWorld {
       def main(args: Array[String]) {
          println("Hello, world!") // prints Hello World
       }
    }

其中的“def main(args: Array[String])”是Scala程序的main(),这是每一个Scala程序的入口部分。我们将上述代码保存为HelloWorld.scala文件,然后输入“scalac HelloWorld.scala”编译该代码。这将在当前目录中生成几个类文件。其中一个名称为HelloWorld.class。这是一个可以运行在Java虚拟机(JVM)上的字节码。键入“scala HelloWorld”来运行程序,就可以在窗口上看到“Hello, World!”。

Spark框架是用Scala语言编写的,在使用Saprk时,采用与底层框架相同的编程语言有很多好处:

●性能开销小;

●能用上Spark最新的版本;

●有助于你更理解Spark的原理。

感兴趣的读者可以参考《Scala编程思想》一书深入学习Scala编程。

2.2.2 Spark SQL

Spark的存在是为了以快于MapReduce的速度进行分布式计算。Spark的设计者很快就了解到,大家还是想要用SQL来访问数据,于是Spark SQL就出现了。Spark SQL是基于Spark引擎对HDFS上的数据集或已有的RDD执行SQL查询。有了Spark SQL就能在Spark程序里用SQL语句操作数据了。比如:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    val persons = sqlContext.sql("SELECT name FROM people WHERE age >= 18 AND age <= 29")

上述两行代码是Scala的语法。这两行都声明了两个新变量。与Java不同的是,Scala在变量声明时不给定变量类型。这个功能在Scala编程语言中称为类型推断。Scala会从上下文中分析出变量类型。只要在Scala中定义新变量,必须在变量名称之前加上val或var。带有val的变量是不可变变量,一旦给不可变变量赋值,就不能改变。而以var开头的变量则可以改变值。

Spark SQL在Spark圈中非常流行。Spark SQL的前身是Shark。我们简短回顾一下Shark的整个发展历史。对于熟悉RDBMS但又不理解MapReduce的技术人员来说,Hive提供快速上手的工具,它是第一个运行在Hadoop上的SQL工具。Hive基于MapReduce,但是MapReduce的中间过程消耗了大量的I/O,影响了运行效率。为了提高在Hadoop上的SQL的效率,一些工具开始产生,其中表现较为突出的是:MapR的Drill、Cloudera的Impala、Shark。其中Shark是伯克利实验室Spark生态环境的组件之一,它修改了内存管理、物理计划、执行三个模块,并使之能运行在Spark引擎上,从而使得SQL查询的速度得到10~100倍的提升。Shark依赖于Hive,比如:Shark采用Hive的语法解析器和查询优化器,这制约了Spark各个组件的相互集成,所以提出了Spark SQL项目。2014年6月1日,Shark项目组宣布停止对Shark的开发,将所有资源放在Spark SQL项目上。Spark SQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive。Spark SQL体系架构如图2-7所示。

图2-7 Spark SQL体系架构

Spark SQL抛弃原有Shark的代码,汲取了Shark的一些优点,如内存列存储(In-Memory Columnar Storage)、Hive兼容性等,重新开发了Spark SQL代码。由于摆脱了对Hive的依赖性,Spark SQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便。在数据兼容方面,Spark不但兼容Hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据。在性能优化方面,除了采取内存列存储、字节码生成技术(bytecode generation)等优化技术外,将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等。在组件扩展方面,无论是SQL的语法解析器还是优化器,都可以重新定义并进行扩展。

2.2.3 Spark Streaming

Spark Streaming是基于Spark引擎对数据流进行不间断处理。只要有新的数据出现,Spark Streaming就能对其进行准实时(数百毫秒级别的延时)的转换和处理。Spark Streaming的工作原理是在小间隔里对数据进行汇集从而形成小批量,然后在小批量数据上运行作业。

使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。下面我们来看一个应用案例。

假定有一个电商网站,它买了几个搜索引擎(如:百度)的很多关键词。当用户在各大搜索引擎上搜索数据时,搜索引擎会根据购买的关键字导流到电商网站的相关产品页面上,用户可能会购买这些产品。现在需要分析的是哪些搜索词带来的订单比较多,然后根据分析结果多投放这些转化率比较高的关键词,从而为电商网站带来更多的收益。

原先的做法是每天凌晨分析前一天的日志数据,这种方式实时性不高,而且由于日志量比较大,单台机器处理已经达到了瓶颈。现在选择了使用Spark Streaming+Kafka+Flume来处理这些日志,并且运行在YARN上以应对遇到的问题。

如图2-8所示,业务日志分布在各台服务器上。由于业务量比较大,所以日志都是按小时切分的,我们采用Flume实时收集这些日志(图中步骤1),然后发送到Kafka集群(图中步骤2)。这里为什么不直接将原始日志直接发送到Spark Streaming呢?这是因为,如果Spark Streaming挂掉了,也不会影响到日志的实时收集。

图2-8 Spark Streaming应用案例

日志实时到达Kafka集群后,我们再通过Spark Streaming实时地从Kafka拉数据(图中步骤3),然后解析日志,并根据一定的逻辑过滤数据和分析订单和搜索词的关联性。我们使用Spark的KafkaUtils.createDirectStream API从Kafka中拉数据,代码片段如下:

    val sparkConf = new SparkConf().setAppName("OrderSpark")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(2))
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerAddress,   "group.id" -> groupId)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder,   StringDecoder](ssc, kafkaParams, Set(topic))

在上述代码中返回的messages是一个刚刚创建DStream,它是对RDD的封装,其上的很多操作都类似于RDD。createDirectStream函数是Spark 1.3.0开始引入的,其内部实现是调用Kafka的低层次API,Spark本身维护Kafka偏移量等信息,所以可以保证数据零丢失。

为了能够在Spark Streaming程序挂掉后又能从断点处恢复,我们每隔2秒进行一次Checkpoint,这些Checkpoint文件存储在HDFS上(图中步骤4)的Checkpoint目录中。我们可以在程序里面设置Checkpoint目录:

    ssc.checkpoint(checkpointDirectory)

如果我们需要从Checkpoint目录中恢复,我们可以使用StreamingContext中的getOrCreate函数。为了让分析结果共享给其他系统使用,我们将分析后的数据重新发送到Kafka(图中步骤5)。最后,我们单独启动了一个程序从Kafka中实时地将分析好的数据存到MySQL中用于持久化存储(图中步骤6)。