Overview
Spark中常用的算子加起来有几十个,其中针对key的聚合算子有五个,分别是groupBy、groupByKey、reduceByKey、aggregateByKey和flodByKey,有几个底层其实调用的都是一个方法,只不过传入的参数不一样产生了这几个算子,但我仍打算分开来详解每个算子的计算过程,加深理解。
这几个聚合算子要解决的问题都是将所需操作的RDD中的key值相同的value值聚合在一起然后两两做计算也就是聚合最终得出一个结果,这里有三个点需要注意,一是两两聚合的初始值,是从外部传入还是使用默认值;二是分区内聚合方式,因为RDD默认是并行计算,会分成多个分区,每个分区内部可以指定聚合方式;三是分区间聚合方式,拿到分区内的聚合结果就要考虑分区间的聚合方式了,这个参数也可以指定。所以这几种算子的区别就是因为传入了不同的参数。
groupBy
先来说说groupBy,它是最容易理解的,就是把key值相同的value值放在一起形成(key, iter)的键值对,聚合的话需要使用map再对每个key对应的iter内容做聚合。
groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
先来看看源码,需要传入一个group方法f,这个f方法传入待分组的元素,返回另外一个值K,而这个K就是分组的依据,注意看最后groupBy返回的结果类型也是以K和相同K的初始元素生成的迭代器所组成的元组,需要对相同K下的iter进行聚合就需要再进行map操作。
// 计算初始RDD不同首字母开头的元素数量
val rdd: RDD[String] = sc.makeRDD(List("Hello", "Java", "Python", "PHP", "Help"))
// ('H', ("Hello", "Help")), ('J', ("Java")), ('P', ("Python", "PHP"))
val groupRDD: RDD[(Char, Iterable[String])] = rdd.groupBy(_.charAt(0))
// ('H', 2), ('J', 1), ('P', 2)
val sizeRDD: RDD[(Char, Int)] = groupRDD.map(_._2.size)
sizeRDD.collect().foreach(println)
具体过程可以参考下图,第二步添加了File落盘动作,因为group操作会计算每个分区所有单词的首字母并缓存下来,如果放在内存中若数据过多则会产生内存溢出;再就是第三步从文件读取回来,并不一定是三个分区,这里只是为了便于理解。
groupByKey
groupByKey相比于groupBy不同的是,groupBy需要指定分组的key,而groupByKey是将元组这种类型的第一个值作为key,对第二个值进行分组的操作。
groupByKey(): RDD[(K, Iterable[V])]
可以看到这个算子不需要传入参数,就是针对元组这种KV类型定义的,至于返回值的类型,K就是元组的第一个值,Iterable(V)则是相同K值的所有V组成的迭代器,那么同时处理元组类型时groupByKey和groupBy的不同之处就是这里的V是元组内的第二个值,而groupBy是初始的元素值,具体看下面的例子:
// 根据RDD内元组的第一个元素将数据分类并对第二个元素求和
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("b", 2)))
// ("a", (1, 3, 4)), ("b", (2))
val groupByKeyRDD: RDD[(String, Iterable[Int])] = sc.groupByKey()
// ("a", (("a", 1), ("a", 3), ("a", 4))), ("b", ("b", 2))
val groupByRDD: RDD[(String, Iterable[(String, Int)])] = sc.groupBy(_._1)
// 当然聚合方式也不相同
groupByKeyRDD.map(_._2.sum)
groupByRDD.map(_._2.map(_._2).sum)
groupByKey也需要落盘操作,会导致数据打乱重组,存在shuffle操作,效率相对来说比较低下,这也就引出了reduceByKey,下面再详细比较两者的不同之处。
reduceByKey
reduceByKey相比于groupByKey就是把map操作集成在算子当中了,不需要再额外进行map操作,它和aggregateByKey以及flodByKey的操作类似,只不过细节之处需要传入不同的参数区分彼此不同的功能。
reduceByKey(func: (V, V) => V): RDD[(K, V)]
可以看到reduceByKey接收一个func参数,而这个func参数接收两个V类型的参数并返回一个V类型的结果,这里的V其实就是初始RDD中的元素,这里需要传入的func就是元素两两计算的逻辑。
// 根据RDD内元组的第一个元素将数据分类并对第二个元素求和
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("b", 2)))
val ReduceByKeyRDD: RDD[(String, Int)] = rdd.reduceByKey(_ + _) // ("a", 8), ("b", 2)
ReduceByKeyRDD.collect().foreach(println)
从下图中的第一张图看相对于groupByKey只是少了map的步骤将它整合在reduceByKey中,但是实际上reduceByKey的作用不止于此,第二张图才是实际的运行模式,它提供了Combine预聚合的功能,支持在分区中先进行聚合,称作分区内聚合,然后再落盘等待分区间聚合。这样下来它不只是减少了map的操作,同时提供了分区内聚合使得shuffle落盘时的数据量尽量小,IO效率也会提高不少。最后它引出了分区内聚合和分区间聚合,reduceByKey的分区内聚合和分区间聚合是一样的。
aggregateByKey
aggregateByKey是对reduceByKey的高级应用,它可以分开来指定分区内聚合和分区间聚合,并提供了一个计算初始值。
aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
来看上方源码,它采用柯里化操作,第一个参数列表接收一个参数zeroValue,它提供一个初始值,不同于reduceByKey直接开始计算第一个元素和第二个元素,aggregateByKey允许先用初始值和第一个元素进行两两计算;第二个参数列表接收两个参数,第一个是seqOp表示分区内聚合方式,它接收两个参数返回一个参数,注意接收的参数一个U的类型和zeroValue类型相同,另外一个是初始元素的类型,返回类型是U类型,说明返回类型是由zeroValue决定的,这很重要;第二个参数combOp表示分区间聚合方式,接收两个U类型的参数并返回一个U类型的参数。最终返回初始元素和聚合后的元素。
// 给定初始RDD并指定两个分区,分区内计算最大值分区间求和
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(("a",1), ("a", 2), ("b", 3), ("a", 4), ("a", 5), ("b", 6)),
numSlices = 2
)
// (("a", 8), ("b", 8))
val aggregateByKeyRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)(
(x, y) => math.max(x, y), // 分区内求最大值
(x, y) => x + y // 分区间求和
)
aggregateByKeyRDD.collect().foreach(println)
看运行过程就更清晰了,相比于reduceByKey只是将分区内聚合和分区间聚合分开来了,并且提供了一个初始值,这个初始值作为第一个元素与初始RDD的第一个元素计算,这也就使得初始值不一样哪怕聚合方式相同结果也可能不一样,详情看下图。其次就是分区数量对结果的影响,上方例子如果按三个分区计算结果又不一样了,它作为aggregateByKey的第四个决定结果的隐形参数在聚合时也需要考虑在内。
flodByKey
flodByKey是aggregateByKey的特例情况,在分区内聚合方式和分区间聚合方式相同的时候使用。
foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
仍然是柯里化传参,第一个参数列表给定一个初始值,第二个参数列表传入一个聚合函数func,在一定条件下和reduceByKey的结果和聚合方式是相同的。
// 计算初始值与所有元素的和
val rdd: RDD[Int] = sc.makeRDD(List(("a", 1), ("a", 3), ("a", 4), ("a", 2)), 2)
val foldByKeyRDD: RDD[Int] = rdd.foldByKey(10)(_ + _) // 10 + 1 + 3 + 10 + 4 + 2 = 30
看运行过程还是比较容易理解的,尤其需要注意初始值的设定,不然会产生意想不到的结果。
Compare
前面把每个算子的详细计算过程都画了一遍,接下来从源码中函数的接收参数中继续看reduceByKey、aggregateByKey和flodByKey这三个算子的联系和不同之处,它们的源码中都调用了一个函数combineByKeyWithClassTag,接下来来看一看传入的参数。从它们源码调用的函数就可以很清楚的区分这几个算子了,结合不同环境使用不同的算子。
// reduceByKey
combineByKeyWithClassTag[V]((v: V) =>
v, // 就是初始RDD的值,当作每个分区开始计算的初始值,不需要指定
func, // 分区内聚合
func, // 分区间聚合
partitioner)
// aggregateByKey
combineByKeyWithClassTag[U]((v: V) =>
cleanedSeqOp(createZero(), v), // 初始值,柯里化的第一个参数列表
cleanedSeqOp, // 分区内聚合,柯里化的第二个参数列表
combOp, // 分区间聚合,与分区内聚合不相同
partitioner)
// flodByKey
combineByKeyWithClassTag[V]((v: V) =>
cleanedFunc(createZero(), v), // 初始值,柯里化的第一个参数列表
cleanedFunc, // 分区内聚合,柯里化的第二个参数列表
cleanedFunc, // 分区间聚合,与分区内聚合相同
partitioner)
Application
获取首字母相同key数据的和
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(("Hello", 1), ("Java", 3), ("Python", 5), ("PHP", 7), ("Help", 9)))
rdd.map(kv => (kv._1.charAt(0), kv._2)) // 先将原始RDD的首字母提出来
.reduceByKey(_ + _) // 再按照key进行求和
.collect()
.foreach(println) // ("P",12), ("H", 10), ("C", 3)
获取相同key数据的平均值
val rdd: RDD[(String, Int)] = sc.makeRDD(
List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("a", 5), ("a", 6), ("b", 7), ("b", 8)), 2)
rdd.aggregateByKey((0.0, 0))( // 元组第一个元素接收求和数据,0.0避免求均值强转为Int,第二个接收数据计数
(k, v) => (k._1 + v, k._2 + 1), // 分区内按key累加、计数
(k, v) => (k._1 + v._1, k._2 + v._2) // 分区间将分区内的统计结果累加
)
.map(kv => (kv._1, kv._2._1 / kv._2._2)) // 求均值
.collect()
.foreach(println) // ("a",3.5), ("b", 5.5)
获取相同key的数据分区内求均值分区间求和的结果
rdd.aggregateByKey((0.0, 0))(
(k, v) => ((k._1 + v), k._2 + 1),
(k, v) => (k._1 / k._2 + v._1 / v._2, k._2 + v._2) // 直接在这一步先计算分区内均值再求和
)
.collect()
.foreach(println) // ("a",(7.0, 4)), ("b", (11.0, 4))
数据如下所示每一行数据是一条点击记录,字段分别为(时间戳 省份 市 用户 广告),计算每个省份点击次数前三名的广告。
1516609143867 6 7 64 16
1516609143869 9 4 75 18
1516609143869 1 7 87 12
1516609143869 2 8 92 9
1516609143869 6 7 84 24
1516609143869 1 8 95 5
1516609143869 8 1 90 29
1516609143869 3 3 36 16
1516609143869 3 3 54 22
1516609143869 7 6 33 5
思考的重点是中间数据结构的转换,刚开始计算的key是省份+广告,后面的key就只有省份了,需要在省份内部做计算。计算过程如下:
val original: RDD[String] = sc.textFile("data/agent.log") // 时间戳 省份 市 用户 广告
val mapRDD: RDD[((String, String), Int)] = original.map(str => {
val strings: Array[String] = str.split(" ") // 拆分数据并取到省份和广告字段
((strings(1), strings(4)), 1)
})
// 计算每个省份每条广告的点击人数
val reduceRDD: RDD[((String, String), Int)] = mapRDD.reduceByKey(_ + _)
// 转换数据结构,因为最终是计算每个省份内的,所以省份是key,将广告跟点击数放一起
val newMapRDD: RDD[(String, (String, Int))] = reduceRDD.map{
case ((pro, ad), sum) => (pro, (ad, sum))
}
// groupBy省份,将所有省份下的所有广告点击数放一起
val groupRDD: RDD[(String, Iterable[(String, Int)])] = newMapRDD.groupByKey()
// 在每个省份内排序,取前三条数据
val mapValuesRDD: RDD[(String, List[(String, Int)])] = groupRDD.mapValues(iter => {
iter.toList.sortWith(_._2 > _._2).take(3) // sortBy(_._2)(Ordering.Int.reverse)
})
mapValuesRDD.collect().foreach(println)
admin