Spark RDD 的分区数量确定

Posted by JustDoDT on April 23, 2018

概述

熟悉Spark的分区对于Spark性能调优很重要,本文总结Spark通过各种函数创建RDD、DataFrame时默认的分区数,其中主要和sc.defaultParallelism、sc.defaultMinPartitions以及HDFS文件的Block数量有关,还有很坑的某些情况的默认分区数为1。

  • 如果分区数少,那么并行执行的task就少,特别情况下,分区数为1,即使你分配的Executor很多,而实际执行的Executor只有1个,如果数据很大的话,那么任务执行的就很慢,好像是卡死了~,所以熟悉各种情况下默认的分区数对于Spark调优就很有必要了,特别是执行完算子返回的结果分区数为1的情况,更需要特别注意。(我就被坑过,我已经分配了足够多的Executor、默认的并行度、以及执行之前的数据集分区数,但分区数依然为1)

1. 关于sc.defaultMinPartitions介绍

sc.defaultMinPartitions=min(sc.defaultParallelism,2) 也就是sc.defaultMinPartitions只有两个值1和2,当sc.defaultParallelism>1时值为2,当sc.defaultParallelism=1时,值为1 上面的公式是在源码里定义的(均在类SparkContext.scala里):

/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */
  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

  /**
   * Default min number of partitions for Hadoop RDDs when not given by user
   * Notice that we use math.min so the "defaultMinPartitions" cannot be higher than 2.
   * The reasons for this are discussed in https://github.com/mesos/spark/pull/718
   */
  def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

2. 关于sc.defaultParallelism的设置

2.1 可以通过spark.default.parallelism设置sc.defaultParallelism的值

2.1.1 在spark-defaults.conf中设置

在配置文件spark-defaults.conf中添加一行;默认的配置文件是没有对这个参数进行设置的

spark.default.parallelism=20

2.2 通过spark-shell 设置分区数量

2.2.1 在spark-shell 中不指定master的情况下,分区数量等于机器的CPU核数
# 查看系统的核数
[hadoop@hadoop001 ~]$ cat /proc/cpuinfo | grep "cpu cores" | uniq
cpu cores       : 4

#用spark-sehll 启动
[hadoop@hadoop001 ~]$ spark-shell  #这里是用的默认的分区数目
.
.
scala> sc.defaultParallelism
res0: Int = 4

2.2.2 在spark-shell 中指定master为local的情况下

注意:在spark-shell里通过–master local和在代码里通过.master(“local”)的结果是一样的,这里以spark-shell为例

当master为local时,值为1;当master为local[n]时,值为n;当master 为local[*]和不指定master一样,都为CPU核数

[hadoop@hadoop001 ~]$ spark-shell --master local  #为local的时候为1
.
.
scala> sc.defaultParallelism
res0: Int = 1


[hadoop@hadoop001 ~]$ spark-shell --master local[30]  #为locla[n]的时候为n
.
.
scala> sc.defaultParallelism
res0: Int = 30


[hadoop@hadoop001 ~]$ spark-shell --master local[*] #为local[*]和不指定master一样都为CPU核数
.
.
scala> sc.defaultParallelism
res0: Int = 4

3. HDFS文件的默认分区

注意:因为RDD有两种创建方式,parallelize和textfile

RDD 读取HDFS文件并不会改变sc.defaultParallelism和sc.defaultMinPartitions的值。

3.1 默认情况

[hadoop@hadoop001 ~]$ hdfs dfs -du -h /hadoop/compress   
1.6 G  1.6 G  /hadoop/compress/data.log 
# 1.6G * 1024 / 128MB = 12.8 block

scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd.getNumPartitions
res3: Int = 13

scala> sc.defaultParallelism
res6: Int = 4

3.2 指定分区

之所以说是默认分区,因为textFile可以指定分区数,sc.textFile(path, minPartitions),通过第二个参数可以指定分区数。

3.2.1 当 sc.defaultParallelism大于block数
[hadoop@hadoop001 ~]$ spark-shell --master local[20]
.
.
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd.getNumPartitions
res0: Int = 13
3.2.2 当 sc.defaultParallelism小于block数
[hadoop@hadoop001 ~]$ spark-shell --master local[10]
.
.
scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd.getNumPartitions
res0: Int = 13

3.2.2 当用参数minPartitions指定时

scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log",20)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd.getNumPartitions
res1: Int = 20

scala> val rdd = sc.textFile("hdfs://hadoop001:/hadoop/compress/data.log",3)
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/hadoop/compress/data.log MapPartitionsRDD[5] at textFile at <console>:24

scala> rdd.getNumPartitions
res2: Int = 13

scala> sc.defaultParallelism
res3: Int = 10

注意:无论是sc.defaultParallelism大于block数还是sc.defaultParallelism小于block数,rdd的默认分区数都为block数;当用参数指定分区数时,有两种情况,当参数大于block数时,则rdd的分区数为指定的参数值,否则分区数为block数。

3.3 当测试小文件时

[hadoop@hadoop001 ~]$ hdfs dfs -du -h /input/    #3副本的情况
35.4 M  106.1 M  /input/data.log
9.5 M   28.4 M   /input/data.log.bz2
11.5 M  34.4 M   /input/data.log.gz
16.8 M  50.4 M   /input/data.log.lzo
16.8 M  50.5 M   /input/data.log.snappy

scala> var rdd = sc.textFile("hdfs://hadoop001:/input/data.log.snappy")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/input/data.log.snappy MapPartitionsRDD[1] at textFile at <console>:24

scala> rdd.getNumPartitions
res0: Int = 1

scala> sc.defaultParallelism
res1: Int = 4

scala> var rdd1 = sc.textFile("hdfs://hadoop001:/input/data.log.snappy",3)
rdd1: org.apache.spark.rdd.RDD[String] = hdfs://hadoop001:/input/data.log.snappy MapPartitionsRDD[3] at textFile at <console>:24

scala> rdd1.getNumPartitions
res2: Int = 1

注意:当HDFS上的文件小于块的大小时,不管是否指定参数,分区数目都为1

4. 本地文件默认的分区数目

在实际工作中很少会用到本地的文件,大多数情况都是外部数据源,HDFS、数据库中的数据

4.1 本地的大文件

[hadoop@hadoop001 data]$ du -sh  data_back.log
36M     data_back.log

scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log")
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[9] at textFile at <console>:24

scala> rdd4.getNumPartitions
res5: Int = 2

scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log",20)
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[11] at textFile at <console>:24

scala> rdd4.getNumPartitions
res6: Int = 20

scala> val rdd4 = sc.textFile("file:///home/hadoop/data/data_back.log",1)
rdd4: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/data_back.log MapPartitionsRDD[13] at textFile at <console>:24

scala> rdd4.getNumPartitions
res7: Int = 2

注意:文件大小为36M,却分成了2个分区,他是固定按照32M来划分的;并不是按照128M划分的

4.2 本地的小文件

[hadoop@hadoop001 data]$ du -sh emp.txt
4.0K    emp.txt

scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt")
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[15] at textFile at <console>:24

scala> rdd5.getNumPartitions
res8: Int = 2

scala> sc.defaultParallelism
res9: Int = 4

scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt",3)
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[17] at textFile at <console>:24

scala> rdd5.getNumPartitions
res10: Int = 3

scala> val rdd5 = sc.textFile("file:///home/hadoop/data/emp.txt",1)
rdd5: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/data/emp.txt MapPartitionsRDD[19] at textFile at <console>:24

scala> rdd5.getNumPartitions
res11: Int = 1

注意:当文件小于32M时,会按照默认的分区数目分区;同时也可以指定分区数目,当指定了分区数的时候,就按照指定的分区数目进行分区

**在Hadoop源码中查看org.apache.hadoop.fs.FileSystem,默认本地文件的大小为32MB**

   /**
      * Return the number of bytes that large input files should be optimally
      * be split into to minimize i/o time.
      * @deprecated use {@link #getDefaultBlockSize(Path)} instead
      */
     @Deprecated
     public long getDefaultBlockSize() {
       // default to 32MB: large enough to minimize the impact of seeks
       return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
     }

5. 使用parallelize 创建RDD时

5.1 不指定分区时

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd1.getNumPartitions
res14: Int = 4

scala> sc.defaultParallelism
res15: Int = 4

5.2 指定分区时

scala> val rdd1 = sc.parallelize(List(1,2,3,4,5),3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at parallelize at <console>:24

scala> rdd1.getNumPartitions
res16: Int = 3

scala> rdd1.glom.collect
res32: Array[Array[Int]] = Array(Array(1), Array(2, 3), Array(4, 5))

5.3 parallelize 的每一个分区数的元素

首先看源码的定义

 /** Distribute a local Scala collection to form an RDD.
   *
   * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call
   * to parallelize and before the first action on the RDD, the resultant RDD will reflect the
   * modified collection. Pass a copy of the argument to avoid this.
   * @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
   * RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
   * @param seq Scala collection to distribute
   * @param numSlices number of partitions to divide the collection into
   * @return RDD representing distributed collection
   */
  def parallelize[T: ClassTag](
      seq: Seq[T],
      numSlices: Int = defaultParallelism): RDD[T] = withScope {
    assertNotStopped()
    new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
  }

也是按照defaultParallelism确定分区数目的

在ParallelCollectionRDD.scala中

private object ParallelCollectionRDD {
  /**
   * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range
   * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
   * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection
   * is an inclusive Range, we use inclusive range for the last slice.
   */
  def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
    if (numSlices < 1) {
      throw new IllegalArgumentException("Positive number of partitions required")
    }
    // Sequences need to be sliced at the same set of index positions for operations
    // like RDD.zip() to behave as expected
    def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
      (0 until numSlices).iterator.map { i =>
        val start = ((i * length) / numSlices).toInt
        val end = (((i + 1) * length) / numSlices).toInt
        (start, end)
      }
    }

上面的val rdd1 = sc.parallelize(List(1,2,3,4,5),3)根据源码可以计算出每一个分区的元素

start   i = 0 ,0*5/3 = 0
end    i = 0 ,(0+1)*5 /3 = 1  
所以第一个分区的索引值为(0,1)对应的元素值即为(1)因为不包含index=1

start i = 1,1*5/3 =1
end i = 1,(1+1)*5/3= 3   
所以第二个分区的索引值为(1,3)对应的元素值即为(2,3)

start i =2,2*5/3=3
end i =2,(2+1)*5/3=5
所以第三个分区的索引值为(3,5)对应的元素值即为(45)