微前端架构如何改变企业的开发模式与效率提升
667
2022-10-08
Spark005---map、mapPartitions
Intro
map、mapPartitions的使用和差异
map
import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[*]").getOrCreate()
import org.apache.spark.sql.SparkSessionspark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22ed5cd
val dataRDD = spark.sparkContext.makeRDD(List(1, 2, 3, 4), 2) val dataRDD1 = dataRDD.map( num => { println("step1<<<<<<<<" + num) num } ) val dataRDD2 = dataRDD1.map( num => { println("step2>>>>>>>>" + num) num } )println(dataRDD2.collect().toArray.mkString(","))
step1<<<<<<<<1step1<<<<<<<<3step2>>>>>>>>3step2>>>>>>>>1step1<<<<<<<<4step2>>>>>>>>4step1<<<<<<<<2step2>>>>>>>>21,2,3,4dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at
不同分区并行处理,1、3分属两个分区,同时执行对于某一个具体数据,同时执行完step1和step2对于同一个分区数据(1,2),顺次执行,1执行完step1、step2,2再执行step1、2
mapPartitions
val dataRDD3 = dataRDD.mapPartitions( data => { println("step1<<<<<<<<") // 转成array,方便查看数据 val data1 = data.map(num => num).toArray // println(data1.mkString(",")) println("step2>>>>>>>>") val data2 = data1.map(num => num * 2) println(data2.mkString(",")) data2.iterator } )println(dataRDD3.collect().toArray.mkString(","))
step1<<<<<<<
mapPartitions同一分区批处理,分区数据(1,2)整体执行step1、step2不同分区可以整体执行,所以可看到有两个step1
mapPartitionsWithIndex
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引 感觉平时并不怎么用,单纯记录下吧
val dataRDD4 = dataRDD.mapPartitionsWithIndex( (index, datas) => { datas.map(x => (index,x)) } ) println(dataRDD4.collect().toArray.mkString(","))
(0,1),(0,2),(1,3),(1,4)dataRDD4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at mapPartitionsWithIndex at
两个小测试
找出每个分区最大的值找出第二个分区的数据
val dataRDD5 = dataRDD.mapPartitionsWithIndex( (index, datas) => { List(s"index=${index},max=${datas.max}").toIterator } ) println(dataRDD5.collect().toArray.mkString(";"))
index=0,max=2;index=1,max=4dataRDD5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at mapPartitionsWithIndex at
val dataRDD6 = dataRDD.mapPartitionsWithIndex( (index, datas) => { index match { case 1 => Array(1, datas.toArray).toIterator case _ => Nil.toIterator } } ) val arr = dataRDD6.collect().toArray val index = arr(0) val data = arr(1).asInstanceOf[Array[Int]] println(s"index=${index},data=${data.mkString(",")}")
index=1,data=3,4dataRDD6: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[7] at mapPartitionsWithIndex at
总结
map和mapPartitionsWithIndex两种方法差异:
数据处理角度
Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作
功能的角度
Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据
性能的角度
Map算子因为类似于串行操作,所以性能比较低mapPartitions 算子类似于批处理,所以性能较高。单会长时间占用内存,在内存有限的情况下,不推荐使用
2021-11-17 于南京市江宁区九龙湖
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~