博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Transformations之mapPartitionsWithIndex
阅读量:6368 次
发布时间:2019-06-23

本文共 3421 字,大约阅读时间需要 11 分钟。

hot3.png

mapWith 在spark1.0之后就过期了,使用mapPartitionsWithIndex代替

====原来mapWith使用

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
  • 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;

  • 第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) x.mapWith(a => a * 10)((a, b) => (b + 2)).collect res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

====新的mapPartitionsWithIndex使用

mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.

类似于mapPartitions, 其函数原型是:

def mapPartitionsWithIndex [ U : ClassTag ]( f : ( Int , Iterator [ T ]) => Iterator [ U ] , preservesPartitioning : Boolean = false ) : RDD [ U ],
mapPartitionsWithIndex的func接受两个参数,第一个参数是分区的索引,第二个是一个数据集分区的迭代器。而输出的是一个包含经过该函数转换的迭代器。下面测试中,将分区索引和分区数据一起输出。
Test:

val x = sc . parallelize ( List (1 ,2 ,3 ,4 ,5 ,6 ,7 ,8 ,9 ,10) , 3)def myfunc ( index : Int , iter : Iterator [ Int ]) : Iterator [ String ] = {iter . toList . map ( x => index + "-" + x ) . iterator}//myfunc: (index: Int, iter: Iterator[Int])Iterator[String]x . mapPartitionsWithIndex ( myfunc ) . collect()res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)

可以运行的类:

package yanan.spark.core.transformations.exampleimport org.apache.spark.SparkConfimport org.apache.spark.SparkContextobject TransformationsTest {  def mapPartitionsFunc[T](iter: Iterator[T]): Iterator[(T, T)] = {    var res = List[(T, T)]()    var pre = iter.next    while (iter.hasNext) {      val cur = iter.next      res.::=(pre, cur)      pre = cur;    }    res.iterator  }  def mapPartitionsTest(sc: SparkContext) = {    val a = sc.parallelize(1 to 9, 3)    a.mapPartitions(mapPartitionsFunc).collect.foreach(println)  }  def mapValuesTest(sc: SparkContext) = {    val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)    val b = a.map(x => (x.length, x))    b.mapValues("x" + _ + "x").collect.foreach(println)  }  def mapWithTest(sc: SparkContext) = {    val x = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)    x.mapWith(a => a * 10)((a, b) => (b + 2)).collect.foreach(println)    //res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)    //mapWith 过期了,使用mapPartitionsWithIndex代替    val parallel = sc.parallelize(1 to 9)    parallel.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => it.toList.map(x => index + ", " + x).iterator).collect   // parallel.collect.foreach(println)        val y = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10), 3)    y.mapPartitionsWithIndex(myfuncPartitionsWithIndex).collect()    //res: Array[String] = Array(0-1, 0-2, 0-3, 1-4, 1-5, 1-6, 2-7, 2-8, 2-9, 2-10)  }  def myfuncPartitionsWithIndex(index: Int, iter: Iterator[Int]): Iterator[String] = {    iter.toList.map(x => index + "-" + x).iterator  }  def main(args: Array[String]) {    val conf = new SparkConf().setAppName(s"Book example: Scala").setMaster("local[2]")    val sc = new SparkContext(conf)    //mapPartitionsTest(sc)    mapWithTest(sc)    sc.stop()  }}

参考:

http://debugo.com/spark-programming-model/

转载于:https://my.oschina.net/forrest420/blog/470511

你可能感兴趣的文章
HDFS 进化,Hadoop 即将拥抱对象存储?
查看>>
Edge 浏览器奇葩 bug:“123456”打印成“114447”
查看>>
Sirius —— 开源版的 Siri ,由 Google 支持
查看>>
《OpenGL ES应用开发实践指南:Android卷》—— 2.7 小结
查看>>
《Windows Server 2012活动目录管理实践》——第 2 章 部署第一台域控制器2.1 案例任务...
查看>>
Java Date Time 教程-时间测量
查看>>
Selector.wakeup实现注记
查看>>
《Java EE 7精粹》—— 第1章 Java EE 1.1 简介
查看>>
《Exchange Server 2013 SP1管理实践》——导读
查看>>
syslog:类Unix系统常用的log服务
查看>>
使用Annotation设计持久层
查看>>
深入实践Spring Boot2.4.1 Neo4j依赖配置
查看>>
Zen Cart 如何添加地址栏上的小图标
查看>>
SecureCrt 连接Redhat linux
查看>>
[NHibernate]持久化类(Persistent Classes)
查看>>
如何在Hive中使用Json格式数据
查看>>
linux如何恢复被删除的热文件
查看>>
Eclipse(MyEclipse) 自动补全
查看>>
Struts2中dispatcher与redirect的区别
查看>>
zabbix agentd configure
查看>>