Spark005---map、mapPartitions

网友投稿 667 2022-10-08

Spark005---map、mapPartitions

Spark005---map、mapPartitions

Intro

map、mapPartitions的使用和差异

map

import org.apache.spark.sql.SparkSessionval spark = SparkSession.builder().master("local[*]").getOrCreate()

import org.apache.spark.sql.SparkSessionspark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@22ed5cd

val dataRDD = spark.sparkContext.makeRDD(List(1, 2, 3, 4), 2) val dataRDD1 = dataRDD.map( num => { println("step1<<<<<<<<" + num) num } ) val dataRDD2 = dataRDD1.map( num => { println("step2>>>>>>>>" + num) num } )println(dataRDD2.collect().toArray.mkString(","))

step1<<<<<<<<1step1<<<<<<<<3step2>>>>>>>>3step2>>>>>>>>1step1<<<<<<<<4step2>>>>>>>>4step1<<<<<<<<2step2>>>>>>>>21,2,3,4dataRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :27dataRDD1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at :28dataRDD2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at map at :34

不同分区并行处理,1、3分属两个分区,同时执行对于某一个具体数据,同时执行完step1和step2对于同一个分区数据(1,2),顺次执行,1执行完step1、step2,2再执行step1、2

mapPartitions

val dataRDD3 = dataRDD.mapPartitions( data => { println("step1<<<<<<<<") // 转成array,方便查看数据 val data1 = data.map(num => num).toArray // println(data1.mkString(",")) println("step2>>>>>>>>") val data2 = data1.map(num => num * 2) println(data2.mkString(",")) data2.iterator } )println(dataRDD3.collect().toArray.mkString(","))

step1<<<<<<<>>>>>>>step2>>>>>>>>2,46,82,4,6,8dataRDD3: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at mapPartitions at :27

mapPartitions同一分区批处理,分区数据(1,2)整体执行step1、step2不同分区可以整体执行,所以可看到有两个step1

mapPartitionsWithIndex

将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引 感觉平时并不怎么用,单纯记录下吧

val dataRDD4 = dataRDD.mapPartitionsWithIndex( (index, datas) => { datas.map(x => (index,x)) } ) println(dataRDD4.collect().toArray.mkString(","))

(0,1),(0,2),(1,3),(1,4)dataRDD4: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[4] at mapPartitionsWithIndex at :27

两个小测试

找出每个分区最大的值找出第二个分区的数据

val dataRDD5 = dataRDD.mapPartitionsWithIndex( (index, datas) => { List(s"index=${index},max=${datas.max}").toIterator } ) println(dataRDD5.collect().toArray.mkString(";"))

index=0,max=2;index=1,max=4dataRDD5: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at mapPartitionsWithIndex at :27

val dataRDD6 = dataRDD.mapPartitionsWithIndex( (index, datas) => { index match { case 1 => Array(1, datas.toArray).toIterator case _ => Nil.toIterator } } ) val arr = dataRDD6.collect().toArray val index = arr(0) val data = arr(1).asInstanceOf[Array[Int]] println(s"index=${index},data=${data.mkString(",")}")

index=1,data=3,4dataRDD6: org.apache.spark.rdd.RDD[Any] = MapPartitionsRDD[7] at mapPartitionsWithIndex at :27arr: Array[Any] = Array(1, Array(3, 4))index: Any = 1data: Array[Int] = Array(3, 4)

总结

map和mapPartitionsWithIndex两种方法差异:

数据处理角度

Map算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作

功能的角度

Map 算子主要目的将数据源中的数据进行转换和改变,但是不会减少或增多数据MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变,所以可以增加或减少数据

性能的角度

Map算子因为类似于串行操作,所以性能比较低mapPartitions 算子类似于批处理,所以性能较高。单会长时间占用内存,在内存有限的情况下,不推荐使用

2021-11-17 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Spark004-rdd分区逻辑
下一篇:微信小程序 教程之注册程序(微信小程序游戏开发)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~