推荐答案
RDD 的 Transformation 操作主要包括以下几类:
- Map:对 RDD 中的每个元素应用一个函数,返回一个新的 RDD。
- Filter:对 RDD 中的每个元素应用一个条件函数,返回满足条件的元素组成的新 RDD。
- FlatMap:类似于 Map,但每个输入元素可以映射到零个或多个输出元素。
- MapPartitions:对 RDD 的每个分区应用一个函数,返回一个新的 RDD。
- MapPartitionsWithIndex:类似于 MapPartitions,但提供了分区的索引。
- Sample:从 RDD 中随机抽取一部分数据,返回一个新的 RDD。
- Union:将两个 RDD 合并,返回一个新的 RDD。
- Intersection:返回两个 RDD 的交集,组成一个新的 RDD。
- Distinct:返回 RDD 中所有不同的元素组成的新 RDD。
- GroupByKey:将 RDD 中的键值对按照键进行分组,返回一个新的 RDD。
- ReduceByKey:将 RDD 中的键值对按照键进行分组,并对每个键的值进行归约操作。
- AggregateByKey:类似于 ReduceByKey,但允许使用不同的初始值和归约函数。
- SortByKey:按照键对 RDD 中的键值对进行排序,返回一个新的 RDD。
- Join:将两个 RDD 按照键进行连接操作,返回一个新的 RDD。
- Cogroup:将多个 RDD 按照键进行分组,返回一个新的 RDD。
- Cartesian:返回两个 RDD 的笛卡尔积,组成一个新的 RDD。
- Pipe:将 RDD 的每个分区通过一个外部命令进行处理,返回一个新的 RDD。
- Coalesce:减少 RDD 的分区数,返回一个新的 RDD。
- Repartition:重新分区 RDD,返回一个新的 RDD。
- RepartitionAndSortWithinPartitions:重新分区 RDD,并在每个分区内按照键进行排序。
本题详细解读
1. Map
Map
操作是 RDD 中最常用的 Transformation 操作之一。它会对 RDD 中的每个元素应用一个函数,并返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4)) val mappedRDD = rdd.map(x => x * 2) // 结果为:2, 4, 6, 8
2. Filter
Filter
操作会根据给定的条件函数过滤 RDD 中的元素,返回满足条件的元素组成的新 RDD。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4)) val filteredRDD = rdd.filter(x => x % 2 == 0) // 结果为:2, 4
3. FlatMap
FlatMap
操作类似于 Map
,但每个输入元素可以映射到零个或多个输出元素。通常用于将嵌套结构扁平化。例如:
val rdd = sc.parallelize(Seq("Hello World", "Hi There")) val flatMappedRDD = rdd.flatMap(x => x.split(" ")) // 结果为:Hello, World, Hi, There
4. MapPartitions
MapPartitions
操作会对 RDD 的每个分区应用一个函数,返回一个新的 RDD。与 Map
不同的是,MapPartitions
是以分区为单位进行操作的。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4), 2) val mapPartitionsRDD = rdd.mapPartitions(iter => iter.map(x => x * 2)) // 结果为:2, 4, 6, 8
5. MapPartitionsWithIndex
MapPartitionsWithIndex
操作类似于 MapPartitions
,但它还提供了分区的索引。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4), 2) val mapPartitionsWithIndexRDD = rdd.mapPartitionsWithIndex((index, iter) => iter.map(x => (index, x * 2))) // 结果为:(0, 2), (0, 4), (1, 6), (1, 8)
6. Sample
Sample
操作用于从 RDD 中随机抽取一部分数据,返回一个新的 RDD。可以指定是否放回抽样以及抽样的比例。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) val sampledRDD = rdd.sample(withReplacement = false, fraction = 0.5) // 结果可能为:1, 3, 5, 7, 9
7. Union
Union
操作将两个 RDD 合并,返回一个新的 RDD。例如:
val rdd1 = sc.parallelize(Seq(1, 2, 3)) val rdd2 = sc.parallelize(Seq(4, 5, 6)) val unionRDD = rdd1.union(rdd2) // 结果为:1, 2, 3, 4, 5, 6
8. Intersection
Intersection
操作返回两个 RDD 的交集,组成一个新的 RDD。例如:
val rdd1 = sc.parallelize(Seq(1, 2, 3)) val rdd2 = sc.parallelize(Seq(2, 3, 4)) val intersectionRDD = rdd1.intersection(rdd2) // 结果为:2, 3
9. Distinct
Distinct
操作返回 RDD 中所有不同的元素组成的新 RDD。例如:
val rdd = sc.parallelize(Seq(1, 2, 2, 3, 3, 3)) val distinctRDD = rdd.distinct() // 结果为:1, 2, 3
10. GroupByKey
GroupByKey
操作将 RDD 中的键值对按照键进行分组,返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3))) val groupByKeyRDD = rdd.groupByKey() // 结果为:("a", Seq(1, 3)), ("b", Seq(2))
11. ReduceByKey
ReduceByKey
操作将 RDD 中的键值对按照键进行分组,并对每个键的值进行归约操作。例如:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3))) val reduceByKeyRDD = rdd.reduceByKey((x, y) => x + y) // 结果为:("a", 4), ("b", 2)
12. AggregateByKey
AggregateByKey
操作类似于 ReduceByKey
,但允许使用不同的初始值和归约函数。例如:
val rdd = sc.parallelize(Seq(("a", 1), ("b", 2), ("a", 3))) val aggregateByKeyRDD = rdd.aggregateByKey(0)((x, y) => x + y, (x, y) => x + y) // 结果为:("a", 4), ("b", 2)
13. SortByKey
SortByKey
操作按照键对 RDD 中的键值对进行排序,返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq(("b", 2), ("a", 1), ("c", 3))) val sortByKeyRDD = rdd.sortByKey() // 结果为:("a", 1), ("b", 2), ("c", 3)
14. Join
Join
操作将两个 RDD 按照键进行连接操作,返回一个新的 RDD。例如:
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2))) val rdd2 = sc.parallelize(Seq(("a", 3), ("c", 4))) val joinRDD = rdd1.join(rdd2) // 结果为:("a", (1, 3))
15. Cogroup
Cogroup
操作将多个 RDD 按照键进行分组,返回一个新的 RDD。例如:
val rdd1 = sc.parallelize(Seq(("a", 1), ("b", 2))) val rdd2 = sc.parallelize(Seq(("a", 3), ("c", 4))) val cogroupRDD = rdd1.cogroup(rdd2) // 结果为:("a", (Seq(1), Seq(3))), ("b", (Seq(2), Seq())), ("c", (Seq(), Seq(4)))
16. Cartesian
Cartesian
操作返回两个 RDD 的笛卡尔积,组成一个新的 RDD。例如:
val rdd1 = sc.parallelize(Seq(1, 2)) val rdd2 = sc.parallelize(Seq("a", "b")) val cartesianRDD = rdd1.cartesian(rdd2) // 结果为:(1, "a"), (1, "b"), (2, "a"), (2, "b")
17. Pipe
Pipe
操作将 RDD 的每个分区通过一个外部命令进行处理,返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq("Hello", "World")) val pipedRDD = rdd.pipe("wc -l") // 结果为:1, 1
18. Coalesce
Coalesce
操作用于减少 RDD 的分区数,返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4), 4) val coalescedRDD = rdd.coalesce(2) // 分区数减少为2
19. Repartition
Repartition
操作用于重新分区 RDD,返回一个新的 RDD。例如:
val rdd = sc.parallelize(Seq(1, 2, 3, 4), 2) val repartitionedRDD = rdd.repartition(4) // 分区数增加为4
20. RepartitionAndSortWithinPartitions
RepartitionAndSortWithinPartitions
操作用于重新分区 RDD,并在每个分区内按照键进行排序。例如:
val rdd = sc.parallelize(Seq(("b", 2), ("a", 1), ("c", 3))) val repartitionAndSortRDD = rdd.repartitionAndSortWithinPartitions(new HashPartitioner(2)) // 结果为:("a", 1), ("b", 2), ("c", 3)