Spark 算子所遇到的坑

Posted by JustDoDT on October 17, 2018

1. aggregate算子操作

val rdd3 = sc.parallelize(List("12","23","345","4567"),2)
rdd3.aggregate("")((x,y) => math.max(x.length, y.length).toString, (x,y) => x + y)

# 两个分区先计算出字符串的最大长度然后合成字符串;结果可能是:”24”,也可能是:”42”,体现了并行化特点


val rdd4 = sc.parallelize(List("12","23","345",""),2)
rdd4.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

# 结果是:”10”,也可能是01”,原因注意有个初始值””,其长度0然后0.toString变成字符串"0".toString的长度为0,"0".toString.length的长度为1 分区可能为(“12”,“23”)(“345”,“”);初始值为""然后初始值和12”,“34比较或者是""345比较然后和“”比较
math.min("".length, "12".length ) 的结果是0 , math.min("0".length, "23".length ) 的结果是1
math.min("".length, "345".length) 的结果是0 , math.min("0".length, "".length)  的结果是0  

val rdd5 = sc.parallelize(List("12","23","","345"),2)
rdd5.aggregate("")((x,y) => math.min(x.length, y.length).toString, (x,y) => x + y)

结果是:”11”,原因如下

math.min("".length, "12".length ) 的结果是0 , math.min("0".length, "23".length ) 的结果是1  

math.min("".length, "".length) 的结果是0 , math.min("0".length, "345".length) 的结果是1  

注意"0".toString的长度为0,"0".toString.length的长度为1

参考博客: Spark高级算子aggregate所遇到的坑

2. redeuce算子

2.1 reduce(_+_)

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[25] at parallelize at <console>:24

scala> rdd1.reduce(_+_)
res33: Int = 15

scala> rdd1.glom.collect
res34: Array[Array[Int]] = Array(Array(1), Array(2), Array(3), Array(4, 5))

scala> rdd1.getNumPartitions
res35: Int = 4

注意:没有指定分区数目,按照默认的分区数目计算

参考博客:Spark-RDD-的分区数量确定

2.2 reduce(_-_)

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:24

scala> rdd1.glom.collect
res36: Array[Array[Int]] = Array(Array(1, 2), Array(3, 4, 5))

scala> rdd1.reduce(_-_)
res41: Int = 5

scala> rdd1.reduce(_-_)
res42: Int = -5

注意:计算结果有2种情况,因为不知道是哪一个分区当作减数

参考博客:每一个分区数目的确定

3. 总结

在Spark中的算子操作,一定要注意分区数目的情况。