RDD 编程基础
RDD 创建
从文件系统中加载数据创建 RDD
Spark 采用 textFile()
方法来从文件系统中加载数据创建 RDD
该方法把文件的 URI
作为参数,这个 URI
可以是
- 本地文件系统的地址
- 分布式文件系统
HDFS
的地址 - AmazonS3 的地址
- 等等
从本地文件系统中加载数据创建 RDD
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") >>> lines.foreach(print) Hadoop is good Spark is fast Spark is better
|
从分布式文件系统 HDFS 中加载数据
>>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt") >>> lines = sc.textFile("/user/hadoop/word.txt") >>> lines = sc.textFile("word.txt")
|
通过并行集合(列表)创建 RDD
可以调用 SparkContext
的 parallelize
方法,在 Driver 中一个已经存在的集合(列表)上创建。
>>> array = [1,2,3,4,5] >>> rdd = sc.parallelize(array) >>> rdd.foreach(print) 1 2 3 4 5
|
RDD 操作
转换操作
对于 RDD 而言,每一次转换操作都会产生不同的 RDD,供给下一个 “转换” 使用.
转换得到的 RDD 是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.
操作 | 含义 |
---|
filter(func) | 筛选出满足函数 func 的元素,并返回一个新的数据集 |
map(func) | 将每个元素传递到函数 func 中,并将结果返回为一个新的数据集 |
flatMap(func) | 与 map () 相似,但每个输入元素都可以映射到 0 或多个输出结果 |
groupByKey() | 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,Iterable) 形式的数据集 |
reduceByKey(func) | 应用于 (K,V) 键值对的数据集时,返回一个新的 (K,V) 形式的数据集,其中每个值是将每个 key 传递到函数 func 中进行聚合后的结果 |
filter(func)
筛选出满足函数 func
的元素,并返回一个新的数据集
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") >>> linesWithSpark = lines.filter(lambda line: "Spark" in line) >>> linesWithSpark.foreach(print) Spark is better Spark is fast
|
map(func)
map
(func) 操作将每个元素传递到函数 func 中,并将结果返回为一个新的数据集
>>> data = [1,2,3,4,5] >>> rdd1 = sc.parallelize(data) >>> rdd2 = rdd1.map(lambda x:x+10) >>> rdd2.foreach(print) 11 13 12 14 15
|
flatMap(func)
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") >>> words = lines.flatMap(lambda line:line.split(" "))
|
groupByKey()
应用于 (K,V) 键值对的数据集时,返回一个新的 (K, Iterable) 形式的数据集
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \ ... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) >>> words1 = words.groupByKey() >>> words1.foreach(print) ('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>) ('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>) ('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>) ('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>) ('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>) ('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)
|
reduceByKey(func)
应用于 (K,V) 键值对的数据集时,返回一个新的 (K, V) 形式的数据集,其中的每个值是将每个 key 传递到函数 func 中进行聚合后得到的结果
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \ ... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)]) >>> words1 = words.reduceByKey(lambda a,b:a+b) >>> words1.foreach(print) ('good', 1) ('Hadoop', 1) ('better', 1) ('Spark', 2) ('fast', 1) ('is', 3)
|
行动操作
行动操作是真正触发计算的地方。Spark 程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
操作 | 含义 |
---|
count() | 返回数据集中的元素个数 |
collect() | 以数组的形式返回数据集中的所有元素 |
first() | 返回数据集中的第一个元素 |
take(n) | 以数组的形式返回数据集中的前 n 个元素 |
reduce(func) | 通过函数 func(输入两个参数并返回一个值)聚合数据集中的元素 |
foreach(func) | 将数据集中的每个元素传递到函数 func 中运行 |
惰性机制
所谓的 “惰性机制” 是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发 “从头到尾” 的真正的计算
这里给出一段简单的语句来解释 Spark 的惰性机制
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") >>> lineLengths = lines.map(lambda s:len(s)) >>> totalLength = lineLengths.reduce(lambda a,b:a+b) >>> print(totalLength)
|
持久化
在 Spark 中,RDD 采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据
>>> list = ["Hadoop","Spark","Hive"] >>> rdd = sc.parallelize(list) >>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算 3 >>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算 Hadoop,Spark,Hive
|
- 可以通过持久化(缓存)机制避免这种重复计算的开销
- 可以使用
persist()
方法对一个 RDD 标记为持久化 - 之所以说 “标记为持久化”,是因为出现
persist()
语句的地方,并不会马上计算生成 RDD 并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化 - 持久化后的 RDD 将会被保留在计算节点的内存中被后面的行动操作重复使用
>>> list = ["Hadoop","Spark","Hive"] >>> rdd = sc.parallelize(list) >>> rdd.cache() >>> print(rdd.count()) 3 >>> print(','.join(rdd.collect())) Hadoop,Spark,Hive
|
分区
RDD 是弹性分布式数据集,通常 RDD 很大,会被分成很多个分区,分别保存在不同的节点上
RDD 分区的一个原则是使得分区的个数尽量等于集群中的 CPU 核心(core)数目
键值对 RDD
键值对 RDD 的创建
从文件中加载
可以采用多种方式创建键值对 RDD,其中一种主要方式是使用 map()
函数来实现
>>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt") >>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1)) >>> pairRDD.foreach(print) ('I', 1) ('love', 1) ('Hadoop', 1) ……
|
通过并行集合(列表)创建 RDD
>>> list = ["Hadoop","Spark","Hive","Spark"] >>> rdd = sc.parallelize(list) >>> pairRDD = rdd.map(lambda word:(word,1)) >>> pairRDD.foreach(print) (Hadoop,1) (Spark,1) (Hive,1) (Spark,1)
|
常用的键值对 RDD 转换操作
reduceByKey(func)
使用 func 函数合并具有相同键的值
>>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]) >>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print) ('Spark', 2) ('Hive', 1) ('Hadoop', 1)
|
groupByKey()
对具有相同键的值进行分组
>>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)] >>> pairRDD = sc.parallelize(list) >>> pairRDD.groupByKey() PythonRDD[27] at RDD at PythonRDD.scala:48 >>> pairRDD.groupByKey().foreach(print) ('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>) ('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
|
sortByKey()
返回一个根据键排序的 RDD
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] >>> pairRDD = sc.parallelize(list) >>> pairRDD.foreach(print) ('Hadoop', 1) ('Spark', 1) ('Hive', 1) ('Spark', 1) >>> pairRDD.sortByKey().foreach(print) ('Hadoop', 1) ('Hive', 1) ('Spark', 1) ('Spark', 1)
|
mapValues(func)
对键值对 RDD 中的每个 value 都应用一个函数,但是,key 不会发生变化
>>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)] >>> pairRDD = sc.parallelize(list) >>> pairRDD1 = pairRDD.mapValues(lambda x:x+1) >>> pairRDD1.foreach(print) ('Hadoop', 2) ('Spark', 2) ('Hive', 2) ('Spark', 2)
|
join
join
就表示内连接。对于内连接,对于给定的两个输入数据集 (K,V1) 和 (K,V2),只有在两个数据集中都存在的 key 才会被输出,最终得到一个 (K,(V1,V2)) 类型的数据集。
>>> pairRDD1 = sc. \ ... parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)]) >>> pairRDD2 = sc.parallelize([("spark","fast")]) >>> pairRDD3 = pairRDD1.join(pairRDD2) >>> pairRDD3.foreach(print) ('spark', (1, 'fast')) ('spark', (2, 'fast'))
|