第3章
Spark程序开发
致虚极,守静笃。万物并作,吾以观复。
——《道德经》第十六章
这世间,一切原本都是空虚而宁静的,万物也因而能够在其中生长。因此,要追寻万物的本质,必须恢复其最原始的虚静状态,只有致虚和守静做到极笃的境地,万物才能蓬勃生长,往复循环。
作为程序员,怎么提倡超越都不为过,但落地到具体问题,我们需要有比较实际的措施。从简单程序开始,以致虚和守静的心态,清空自己在大数据方向不劳而获的幻想,逐步成长为业内有影响力的角色。对于大部分程序员而言,本章内容略显基础,首先通过Spark交互Shell来介绍Spark API,编写简单的Spark程序,然后展示如何构建Spark开发环境,以及编写简单的Spark案例程序,并提交应用程序。
3.1 使用Spark Shell编写程序
要学习Spark程序开发,建议首先通过spark-shell交互式学习,加深对Spark程序开发的理解。spark-shell提供了一种学习API的简单方式,以及一个能够交互式分析数据的强大工具,在Scala语言环境下(Scala运行于Java虚拟机,因此能有效使用现有的Java库)或Python语言环境下均可使用。
3.1.1 启动Spark Shell
在spark-shell中,已经创建了一个名为sc的SparkContext对象,如在4个CPU核上运行bin/spark-shell,命令如下:
./bin/spark-shell --master local[4]
如果指定Jar包路径,命令如下:
./bin/spark-shell --master local[4] --jars testcode.jar
其中,--master用来设置context将要连接并使用的资源主节点,master的值是Standalone模式的Spark集群地址、Mesos或YARN集群的URL,或者是一个local地址;使用--jars可以添加Jar包的路径,使用逗号分隔可以添加多个包。进一步说明,spark-shell的本质是在后台调用了spark-submit脚本来启动应用程序。
3.1.2 加载text文件
Spark创建sc之后,可以加载本地文件创建RDD,我们以加载Spark自带的本地文件README.md文件进行测试,返回一个MapPartitionsRDD文件。
scala>val textFile= sc.textFile("file:///$SPARK_HOME/README.md") textFile: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[3] at textFile at <console>:21
需要说明的是,加载HDFS文件和本地文件都是使用textFile,区别是添加前缀(hdfs://和f ile://)进行标识,从本地文件读取文件直接返回MapPartitionsRDD,而从HDFS读取的文件先转成HadoopRDD,然后隐式转换成MapPartitionsRDD。
上面所说的MapPartitionsRDD和HadoopRDD都是基于Spark的弹性分布式数据集(RDD)。
3.1.3 简单RDD操作
对于RDD,可以执行Transformation返回新RDD,也可以执行Action得到返回结果。下面从f irst和count命令开始Action之旅,示例代码如下:
// 获取RDD文件textFile的第一项 scala>textFile.f irst() res0: String = # Apache Spark // 获取RDD文件textFile所有项的计数 scala>textFile.count() res1: Long = 98
接下来通过Transformation操作,使用f ilter命令返回一个新的RDD,即抽取文件全部条目的一个子集,返回一个新的FilteredRDD,示例代码如下:
// 抽取含有"Spark"的子集
scala>valtext Filter = textFile.filter(line >line.contains("Spark")) textFilter: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:23
我们可以链接多个Transformation和Action进行操作。示例代码如下:
scala>textFile.filter(line =>line.contains("Spark")).count() res2: Long = 18
3.1.4 简单RDD操作应用
通过简单RDD操作进行组合,来实现找出文本中每行最多单词数,词频统计等。
1.找出文本中每行最多单词数
基于RDD的Transformation和Action可以用作更复杂的计算,假设想要找到文本中每行最多单词数,可以参考如下代码:
scala>textFile.map(line =>line.split(" ").size).reduce((a, b) => if (a > b) a else b) res3: Int = 14
在上面这段代码中,首先将textFile每一行文本中的句子使用split(" ")进行分词,并统计分词后的单词数。创建一个基于单词数的新RDD,然后针对该RDD执行Reduce操作使用(a, b) => if (a > b) a else b函数进行比较,返回最大值。
2.词频统计
从MapReduce开始,词频统计已经成为大数据处理最流行的入门程序,类似MapReduce,Spark也能很容易地实现MapReduce,示例程序如下:
// 结合f latMap、map和reduceByKey来计算文件中每个单词的词频 scala>val wordCount= textFile.flatMap(line =>line.split(" ")).map(word => (word,1)). reduceByKey((a, b) => a + b) wordCount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:23 // 使用collect聚合单词计数结果 scala>wordCount.collect() res4: Array[(String, Int)] = Array((package,1), (this,1) ,...
这里,结合f latMap、Map和reduceByKey来计算文件中每个单词的词频,并返回(string、int)类型的键值对ShuffledRDD(由于reduceByKey执行时需要进行Shuffle操作,返回的是一个Shuffle形式的RDD,ShuffledRDD),最后使用Collect聚合单词计数结果。
如果想让Scala的函数文本更简洁,可以使用占位符“_”,占位符可以看作表达式里需要被“填入”的“空白”,这个“空白”在每次函数被调用时,由函数的参数填入。
当每个参数在函数文本中最多出现一次的情况下,可以使用_+_扩展成带两个参数的函数文本;多个下划线指代多个参数,而不是单个参数的重复使用。第一个下划线代表第一个参数,第二个下划线代表第二个参数,依此类推。
下面通过占位符对词频统计进行优化。
scala>val wordCount=textFile.flatMap(_.split(" ")).map(_,1) .reduceByKey(_+_)
Spark默认是不进行排序的,如果以排序的方法进行输出,需要进行key和value互换,然后采取sortByKey的方式,可以指定降序(false)和升序(true)。这样就完成了数据统计和排序,具体代码如下:
scala>val wordCount= inFile.flatMap(_.split(" ")).map(_, 1).reduceByKey(_+_). map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
上面的代码通过第一个x=>(x._2,x._1)实现key和value互换,然后通过sortByKey(false)实现降序排列,通过第二个x=>(x._2,x._1)再次实现key和value互换,最终实现排序功能。
3.1.5 RDD缓存
Spark也支持将数据集存进一个集群的内存缓存中,当数据被反复访问时,如在查询一个小而“热”数据集,或运行像PageRank的迭代算法时,是非常有用的。举一个简单的例子,缓存变量textFilter(即包含字符串“Spark”的数据集),并针对缓存计算。
scala>textFilter.cache() res5: textFilter.type = MapPartitionsRDD[2] at filter at <console>:23 scala>textFilter.count() res6: Long = 18
通过cache缓存数据可以用于非常大的数据集,支持跨越几十或几百个节点。