Spark RDD编程

RDD编程基础

RDD创建

从文件系统中加载数据创建RDD

Spark采用textFile()方法来从文件系统中加载数据创建RDD
该方法把文件的URI作为参数,这个URI可以是

  • 本地文件系统的地址
  • 分布式文件系统HDFS的地址
  • AmazonS3的地址
  • 等等
    从本地文件系统中加载数据创建RDD
    1
    2
    3
    4
    5
    >>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
    >>> lines.foreach(print)
    Hadoop is good
    Spark is fast
    Spark is better

    从分布式文件系统HDFS中加载数据
    1
    2
    3
    >>> lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
    >>> lines = sc.textFile("/user/hadoop/word.txt")
    >>> lines = sc.textFile("word.txt")

    通过并行集合(列表)创建RDD

    可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(列表)上创建。
    1
    2
    3
    4
    5
    6
    7
    8
    >>> array = [1,2,3,4,5]
    >>> rdd = sc.parallelize(array)
    >>> rdd.foreach(print)
    1
    2
    3
    4
    5


    RDD操作

    转换操作

    对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用.
    转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.
操作含义
filter(func)筛选出满足函数func的元素,并返回一个新的数据集
map(func)将每个元素传递到函数func中,并将结果返回为一个新的数据集
flatMap(func)与map()相似,但每个输入元素都可以映射到0或多个输出结果
groupByKey()应用于(K,V)键值对的数据集时,返回一个新的(K,Iterable)形式的数据集
reduceByKey(func)应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果

filter(func)

筛选出满足函数func的元素,并返回一个新的数据集

1
2
3
4
5
>>>  lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> linesWithSpark = lines.filter(lambda line: "Spark" in line)
>>> linesWithSpark.foreach(print)
Spark is better
Spark is fast


map(func)

map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集

1
2
3
4
5
6
7
8
9
>>> data = [1,2,3,4,5]
>>> rdd1 = sc.parallelize(data)
>>> rdd2 = rdd1.map(lambda x:x+10)
>>> rdd2.foreach(print)
11
13
12
14
15


flatMap(func)

1
2
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> words = lines.flatMap(lambda line:line.split(" "))


groupByKey()

应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集

1
2
3
4
5
6
7
8
9
10
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1), \
... ("Spark",1),("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.groupByKey()
>>> words1.foreach(print)
('Hadoop', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('better', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
('fast', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('good', <pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
('Spark', <pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
('is', <pyspark.resultiterable.ResultIterable object at 0x7fb210552e10>)



reduceByKey(func)

应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果

1
2
3
4
5
6
7
8
9
10
>>> words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), \
... ("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
>>> words1 = words.reduceByKey(lambda a,b:a+b)
>>> words1.foreach(print)
('good', 1)
('Hadoop', 1)
('better', 1)
('Spark', 2)
('fast', 1)
('is', 3)

**

行动操作

行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。

操作含义
count()返回数据集中的元素个数
collect()以数组的形式返回数据集中的所有元素
first()返回数据集中的第一个元素
take(n)以数组的形式返回数据集中的前n个元素
reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
foreach(func)将数据集中的每个元素传递到函数func中运行

惰性机制

所谓的“惰性机制”是指,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算
这里给出一段简单的语句来解释Spark的惰性机制

1
2
3
4
>>> lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
>>> lineLengths = lines.map(lambda s:len(s))
>>> totalLength = lineLengths.reduce(lambda a,b:a+b)
>>> print(totalLength)

持久化

在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据

1
2
3
4
5
6
>>> list = ["Hadoop","Spark","Hive"]
>>> rdd = sc.parallelize(list)
>>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
3
>>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
Hadoop,Spark,Hive
  • 可以通过持久化(缓存)机制避免这种重复计算的开销
  • 可以使用persist()方法对一个RDD标记为持久化
  • 之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化
  • 持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用
    1
    2
    3
    4
    5
    6
    7
    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> rdd.cache() #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
    >>> print(rdd.count()) #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
    3
    >>> print(','.join(rdd.collect())) #第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
    Hadoop,Spark,Hive

    分区

    RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上
    RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心(core)数目

    键值对RDD

    键值对RDD的创建

    从文件中加载

    可以采用多种方式创建键值对RDD,其中一种主要方式是使用map()函数来实现
    1
    2
    3
    4
    5
    6
    7
    >>> lines = sc.textFile("file:///usr/local/spark/mycode/pairrdd/word.txt")
    >>> pairRDD = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
    >>> pairRDD.foreach(print)
    ('I', 1)
    ('love', 1)
    ('Hadoop', 1)
    ……

    通过并行集合(列表)创建RDD

    1
    2
    3
    4
    5
    6
    7
    8
    >>> list = ["Hadoop","Spark","Hive","Spark"]
    >>> rdd = sc.parallelize(list)
    >>> pairRDD = rdd.map(lambda word:(word,1))
    >>> pairRDD.foreach(print)
    (Hadoop,1)
    (Spark,1)
    (Hive,1)
    (Spark,1)

    常用的键值对RDD转换操作

    reduceByKey(func)

    使用func函数合并具有相同键的值
    1
    2
    3
    4
    5
    >>> pairRDD = sc.parallelize([("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)])
    >>> pairRDD.reduceByKey(lambda a,b:a+b).foreach(print)
    ('Spark', 2)
    ('Hive', 1)
    ('Hadoop', 1)

    groupByKey()

    对具有相同键的值进行分组
    1
    2
    3
    4
    5
    6
    7
    >>> list = [("spark",1),("spark",2),("hadoop",3),("hadoop",5)]
    >>> pairRDD = sc.parallelize(list)
    >>> pairRDD.groupByKey()
    PythonRDD[27] at RDD at PythonRDD.scala:48
    >>> pairRDD.groupByKey().foreach(print)
    ('hadoop', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)
    ('spark', <pyspark.resultiterable.ResultIterable object at 0x7f2c1093ecf8>)

    sortByKey()

    返回一个根据键排序的RDD
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    >>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
    >>> pairRDD = sc.parallelize(list)
    >>> pairRDD.foreach(print)
    ('Hadoop', 1)
    ('Spark', 1)
    ('Hive', 1)
    ('Spark', 1)
    >>> pairRDD.sortByKey().foreach(print)
    ('Hadoop', 1)
    ('Hive', 1)
    ('Spark', 1)
    ('Spark', 1)

    mapValues(func)

    对键值对RDD中的每个value都应用一个函数,但是,key不会发生变化
    1
    2
    3
    4
    5
    6
    7
    8
    >>> list = [("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
    >>> pairRDD = sc.parallelize(list)
    >>> pairRDD1 = pairRDD.mapValues(lambda x:x+1)
    >>> pairRDD1.foreach(print)
    ('Hadoop', 2)
    ('Spark', 2)
    ('Hive', 2)
    ('Spark', 2)

    join

    join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个(K,(V1,V2))类型的数据集。
    1
    2
    3
    4
    5
    6
    7
    >>> pairRDD1 = sc. \
    ... parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
    >>> pairRDD2 = sc.parallelize([("spark","fast")])
    >>> pairRDD3 = pairRDD1.join(pairRDD2)
    >>> pairRDD3.foreach(print)
    ('spark', (1, 'fast'))
    ('spark', (2, 'fast'))

评论