mapWith 在spark1.0之后就过期了,使用mapPartitionsWithIndex代替
====原来mapWith使用
mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:
def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。
举例:把partition index 乘以10,然后加上2作为新的RDD的元素。
val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) x.mapWith(a => a * 10)((a, b) => (b + 2)).collect res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)
====新的mapPartitionsWithIndex使用
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
类似于mapPartitions, 其函数原型是:
def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。Test:val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {iter . toList . map ( x => index + "-" + x ) . iterator}//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]x . mapPartitionsWithIndex ( myfunc ) . collect()res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)
可以运行的类:
package yanan.spark.core.transformations.exampleimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject TransformationsTest { def mapPartitionsFunc[T](iter: Iterator[T]): Iterator[(T, T)] = { var res = List[(T, T)]() var pre = iter.next while (iter.hasNext) { val cur = iter.next res.::=(pre, cur) pre = cur; } res.iterator } def mapPartitionsTest(sc: SparkContext) = { val a = sc.parallelize(1 to 9, 3) a.mapPartitions(mapPartitionsFunc).collect.foreach(println) } def mapValuesTest(sc: SparkContext) = { val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2) val b = a.map(x => (x.length, x)) b.mapValues("x" + _ + "x").collect.foreach(println) } def mapWithTest(sc: SparkContext) = { val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3) x.mapWith(a => a * 10)((a, b) => (b + 2)).collect.foreach(println) //res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22) //mapWith 过期了,使用mapPartitionsWithIndex代替 val parallel = sc.parallelize(1 to 9) parallel.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", " + x).iterator).collect // parallel.collect.foreach(println) val y = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3) y.mapPartitionsWithIndex(myfuncPartitionsWithIndex).collect() //res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10) } def myfuncPartitionsWithIndex(index: Int, iter: Iterator[Int]): Iterator[String] = { iter.toList.map(x => index + "-" + x).iterator } def main(args: Array[String]) { val conf = new SparkConf().setAppName(s"Book example: Scala").setMaster("local[2]") val sc = new SparkContext(conf) //mapPartitionsTest(sc) mapWithTest(sc) sc.stop() }}
参考:
http://debugo.com/spark-programming-model/