1. ReduceByKey
1.1 源码中介绍
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce.
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
reduceByKey(new HashPartitioner(numPartitions), func)
1.2 测试
scala> sc.textFile("file:///home/hadoop/data/wordcount.txt").flatMap(x=>x.split(",")).map((_,1)).reduceByKey(_+_).collect
res1: Array[(String, Int)] = Array((Hello,4), (World,3), (China,2), (Hi,1))
2. groupByKey
2.1 源码中介绍
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
* within each group is not guaranteed, and may even differ each time the resulting RDD is
* evaluated.
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
2.2 测试
scala> val data = sc.textFile("file:///home/hadoop/data/wordcount.txt").flatMap(x=>x.split(",")).map((_,1)).groupByKey()
data: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[9] at groupByKey at <console>:24
scala> data.collect
res2: Array[(String, Iterable[Int])] = Array((Hello,CompactBuffer(1, 1, 1, 1)), (World,CompactBuffer(1, 1, 1)), (China,CompactBuffer(1, 1)), (Hi,CompactBuffer(1)))
scala> data.map(x=>(x._1,x._2.sum)).collect
res4: Array[(String, Int)] = Array((Hello,4), (World,3), (China,2), (Hi,1))
### 方法二,用reduce(_+_)
scala> val data = sc.textFile("file:///home/hadoop/data/wordcount.txt").flatMap(x=>x.split(",")).map((_,1)).groupByKey()
data: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[17] at groupByKey at <console>:24
scala> data.map(x=>(x._1,x._2.reduce(_+_)))
res16: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[18] at map at <console>:26
scala> data.map(x=>(x._1,x._2.reduce(_+_))).collect
res17: Array[(String, Int)] = Array((Hello,4), (World,3), (China,2), (Hi,1))
2.3 reduceByKey与groupByKey的差异
2.4 PairRDDFunctions类
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
class PairRDDFunctions[K, V](self: RDD[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
extends Logging with Serializable {
3. count算子
* Return the number of elements in the RDD.
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum
4. collect 算子
* Return an array that contains all of the elements in this RDD.
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
def collect(): Array[T] = withScope {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)