cogroup
源码介绍
/**
* For each key k in `this` or `other1` or `other2` or `other3`,
* return a resulting RDD that contains a tuple with the list of values
* for that key in `this`, `other1`, `other2` and `other3`.
*/
def cogroup[W1, W2, W3](other1: RDD[(K, W1)],
other2: RDD[(K, W2)],
other3: RDD[(K, W3)],
partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2, other3), partitioner)
cg.mapValues { case Array(vs, w1s, w2s, w3s) =>
(vs.asInstanceOf[Iterable[V]],
w1s.asInstanceOf[Iterable[W1]],
w2s.asInstanceOf[Iterable[W2]],
w3s.asInstanceOf[Iterable[W3]])
}
}
对于每一个key,在other1或者other2里面都可以,返回一个结果RDD,包含了一个元祖,元祖里面的每一个key,对应每一个other1、other2。
join
源码介绍:
/**
* Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
* pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
* (k, v2) is in `other`. Performs a hash join across the cluster.
*/
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope {
join(other, defaultPartitioner(self, other))
}
返回包含所有元素对的RDD,其中包含“this”和“other”中的匹配键。 每对元素将作为(k,(v1,v2))元组返回,其中(k,v1)在“this”中,而(k,v2)在“other”中。 在整个群集中执行散列连接。
cogroup VS join
Join代码测试
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala> rdd1.join(rdd2).collect
res0: Array[(String, (String, String))] = Array((A,(1,a)), (C,(3,c)))
将出现在最终结果中的相关联的所有键值。和关系型数据库的Inner Join类型。
cogroup测试
val rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
val rdd2 = sc.makeRDD(Array(("A","a"),("C","c"),("D","d")),2)
scala> var rdd3 = rdd1.cogroup(rdd2).collect
res0: Array[(String, (Iterable[String], Iterable[String]))] = Array(
(B,(CompactBuffer(2),CompactBuffer())),
(D,(CompactBuffer(),CompactBuffer(d))),
(A,(CompactBuffer(1),CompactBuffer(a))),
(C,(CompactBuffer(3),CompactBuffer(c)))
)
一个键至少出现在两个RDD中的任何一个钟,它将出现在最终的结果中。这非常类似于关系型数据库的full outer join。
groupBy
源码介绍
/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* @note This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
* or `PairRDDFunctions.reduceByKey` will provide much better performance.
*/
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
groupBy[K](f, defaultPartitioner(this))
}
返回分组项的RDD。 每个组由一个键和一系列映射到该键的元素组成。 不保证每个组内元素的排序,并且每次评估结果RDD时甚至可能不同。
测试代码
scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[29] at parallelize at <console>:24
scala> a.groupBy(x => { if (x % 2 == 0) "even" else "odd" }).collect
res29: Array[(String, Iterable[Int])] = Array((even,CompactBuffer(2, 4, 6, 8)), (odd,CompactBuffer(1, 3, 5, 7, 9))
总结
cogroup 和join比较类似,cogroup相当于MySQL中的inner join;join相当于MySQL中的full outer join;而groupBy和他们不同,只是对key进行分组。