概述
熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些情况的默认分区数为1。
- 如果分区数少,那么并行执行的task就少,特别情况下,分区数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据很大的话,那么任务执行的就很慢,好像是卡死了~,所以熟悉各种情况下默认的分区数对于Spark调优就很有必要了,特别是执行完算子返回的结果分区数为1的情况,更需要特别注意。(我就被坑过,我已经分配了足够多的Executor、默认的并行度、以及执行之前的数据集分区数,但分区数依然为1)
1. 关于sc.defaultMinPartitions介绍
sc.defaultMinPartitions=min(sc.defaultParallelism,2) 也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1 上面的公式是在源码里定义的(均在类SparkContext.scala里):
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
def defaultParallelism: Int = {
assertNotStopped()
taskScheduler.defaultParallelism
}
/**
* Default min number of partitions for Hadoop RDDs when not given by user
* Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
* The reasons for this are discussed in https://github.com/mesos/spark/pull/718
*/
def defaultMinPartitions: Int = math.min(defaultParallelism, 2)
2. 关于sc.defaultParallelism的设置
2.1 可以通过spark.default.parallelism设置sc.defaultParallelism的值
2.1.1 在spark-defaults.conf中设置
在配置文件spark-defaults.conf中添加一行;默认的配置文件是没有对这个参数进行设置的
spark.default.parallelism=20
2.2 通过spark-shell 设置分区数量
2.2.1 在spark-shell 中不指定master的情况下,分区数量等于机器的CPU核数
# 查看系统的核数
[hadoop@hadoop001 ~]$ cat /proc/cpuinfo | grep "cpu cores" | uniq
cpu cores : 4
#用spark-sehll 启动
[hadoop@hadoop001 ~]$ spark-shell #这里是用的默认的分区数目
.
.
scala> sc.defaultParallelism
res0: Int = 4
2.2.2 在spark-shell 中指定master为local的情况下
注意:在spark-shell里通过–master local和在代码里通过.master(“local”)的结果是一样的,这里以spark-shell为例
当master为local时,值为1;当master为local[n]时,值为n;当master 为local[*]和不指定master一样,都为CPU核数
[hadoop@hadoop001 ~]$ spark-shell --master local #为local的时候为1
.
.
scala> sc.defaultParallelism
res0: Int = 1
[hadoop@hadoop001 ~]$ spark-shell --master local[30] #为locla[n]的时候为n
.
.
scala> sc.defaultParallelism
res0: Int = 30
[hadoop@hadoop001 ~]$ spark-shell --master local[*] #为local[*]和不指定master一样,都为CPU核数
.
.
scala> sc.defaultParallelism
res0: Int = 4
3. HDFS文件的默认分区
注意:因为RDD有两种创建方式,parallelize和textfile
RDD 读取HDFS文件并不会改变sc.defaultParallelism和sc.defaultMinPartitions的值。
3.1 默认情况
[hadoop@hadoop001 ~]$ hdfs dfs -du -h /hadoop/compress
1.6 G 1.6 G /hadoop/compress/data.log
# 1.6G * 1024 / 128MB = 12.8 block
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd.getNumPartitions
res3: Int = 13
scala> sc.defaultParallelism
res6: Int = 4
3.2 指定分区
之所以说是默认分区,因为textFile可以指定分区数,sc.textFile(path, minPartitions),通过第二个参数可以指定分区数。
3.2.1 当 sc.defaultParallelism大于block数
[hadoop@hadoop001 ~]$ spark-shell --master local[20]
.
.
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.getNumPartitions
res0: Int = 13
3.2.2 当 sc.defaultParallelism小于block数
[hadoop@hadoop001 ~]$ spark-shell --master local[10]
.
.
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.getNumPartitions
res0: Int = 13
3.2.2 当用参数minPartitions指定时
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log",20)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd.getNumPartitions
res1: Int = 20
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log",3)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[5] at textFile at <console>:24
scala> rdd.getNumPartitions
res2: Int = 13
scala> sc.defaultParallelism
res3: Int = 10
注意:无论是sc.defaultParallelism大于block数还是sc.defaultParallelism小于block数,rdd的默认分区数都为block数;当用参数指定分区数时,有两种情况,当参数大于block数时,则rdd的分区数为指定的参数值,否则分区数为block数。
3.3 当测试小文件时
[hadoop@hadoop001 ~]$ hdfs dfs -du -h /input/ #3副本的情况
35.4 M 106.1 M /input/data.log
9.5 M 28.4 M /input/data.log.bz2
11.5 M 34.4 M /input/data.log.gz
16.8 M 50.4 M /input/data.log.lzo
16.8 M 50.5 M /input/data.log.snappy
scala> var rdd = sc.textFile("hdfs://hadoop001:/input/data.log.snappy")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/input/data.log.snappy MapPartitionsRDD[1] at textFile at <console>:24
scala> rdd.getNumPartitions
res0: Int = 1
scala> sc.defaultParallelism
res1: Int = 4
scala> var rdd1 = sc.textFile("hdfs://hadoop001:/input/data.log.snappy",3)
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/input/data.log.snappy MapPartitionsRDD[3] at textFile at <console>:24
scala> rdd1.getNumPartitions
res2: Int = 1
注意:当HDFS上的文件小于块的大小时,不管是否指定参数,分区数目都为1
4. 本地文件默认的分区数目
在实际工作中很少会用到本地的文件,大多数情况都是外部数据源,HDFS、数据库中的数据
4.1 本地的大文件
[hadoop@hadoop001 data]$ du -sh data_back.log
36M data_back.log
scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log")
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[9] at textFile at <console>:24
scala> rdd4.getNumPartitions
res5: Int = 2
scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log",20)
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[11] at textFile at <console>:24
scala> rdd4.getNumPartitions
res6: Int = 20
scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log",1)
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[13] at textFile at <console>:24
scala> rdd4.getNumPartitions
res7: Int = 2
注意:文件大小为36M,却分成了2个分区,他是固定按照32M来划分的;并不是按照128M划分的
4.2 本地的小文件
[hadoop@hadoop001 data]$ du -sh emp.txt
4.0K emp.txt
scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt")
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> rdd5.getNumPartitions
res8: Int = 2
scala> sc.defaultParallelism
res9: Int = 4
scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt",3)
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[17] at textFile at <console>:24
scala> rdd5.getNumPartitions
res10: Int = 3
scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt",1)
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[19] at textFile at <console>:24
scala> rdd5.getNumPartitions
res11: Int = 1
注意:当文件小于32M时,会按照默认的分区数目分区;同时也可以指定分区数目,当指定了分区数的时候,就按照指定的分区数目进行分区
**在Hadoop源码中查看org.apache.hadoop.fs.FileSystem,默认本地文件的大小为32MB**
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time.
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
@Deprecated
public long getDefaultBlockSize() {
// default to 32MB: large enough to minimize the impact of seeks
return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
}
5. 使用parallelize 创建RDD时
5.1 不指定分区时
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24
scala> rdd1.getNumPartitions
res14: Int = 4
scala> sc.defaultParallelism
res15: Int = 4
5.2 指定分区时
scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24
scala> rdd1.getNumPartitions
res16: Int = 3
scala> rdd1.glom.collect
res32: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))
5.3 parallelize 的每一个分区数的元素
首先看源码的定义
/** Distribute a local Scala collection to form an RDD.
*
* @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
* to parallelize and before the first action on the RDD, the resultant RDD will reflect the
* modified collection. Pass a copy of the argument to avoid this.
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
* @param seq Scala collection to distribute
* @param numSlices number of partitions to divide the collection into
* @return RDD representing distributed collection
*/
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
也是按照defaultParallelism确定分区数目的
在ParallelCollectionRDD.scala中
private object ParallelCollectionRDD {
/**
* Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
* is an inclusive Range, we use inclusive range for the last slice.
*/
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
上面的val rdd1 = sc.parallelize(List(1,2,3,4,5),3)根据源码可以计算出每一个分区的元素
start i = 0 ,0*5/3 = 0
end i = 0 ,(0+1)*5 /3 = 1
所以第一个分区的索引值为(0,1)对应的元素值,即为(1),因为不包含index=1
start i = 1,1*5/3 =1
end i = 1,(1+1)*5/3= 3
所以第二个分区的索引值为(1,3)对应的元素值,即为(2,3)
start i =2,2*5/3=3
end i =2,(2+1)*5/3=5
所以第三个分区的索引值为(3,5)对应的元素值,即为(4,5)