洞察探索open banking如何通过小程序容器技术助力金融企业实现数据安全和数字化转型
602
2022-10-20
RDD转换操作算子 --- zip(k-v)、join(k)、cogroup(k)、lookup(k)
RDD转换操作算子 — zip、join
zip 用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常
@Test def test{ // zip 将两个RDD组合成(key,value)的形式 val a = sc.makeRDD(1 to 5,2) val b = sc.makeRDD(Seq("a","b","c","d","e"),2) val zip1: RDD[(Int, String)] = a.zip(b) val zip2: RDD[(String, Int)] = b.zip(a) zip1.collect().foreach(print(_))//(1,a)(2,b)(3,c)(4,d)(5,e) zip2.collect().foreach(print(_))//(a,1)(b,2)(c,3)(d,4)(e,5) sc.stop() }
join 将两个RDD按照相同的Key来进行连接
val a = sc.parallelize(Seq(("a",1),("b",2),("c",3))) val b = sc.parallelize(Seq(("a",1),("d",4),("e",5))) val join = a.join(b) val r_join = a.rightOuterJoin(b) val l_join = a.leftOuterJoin(b) val f_join = a.fullOuterJoin(b) join.collect().foreach(println(_)) /** * (a,(1,1)) */ r_join.collect().foreach(println(_)) /** * (a,(Some(1),1)) * (d,(None,4)) * (e,(None,5)) */ l_join.collect().foreach(println(_)) /** * (a,(1,Some(1))) * (b,(2,None)) * (c,(3,None)) */ f_join.collect().foreach(println(_)) /** * (a,(Some(1),Some(1))) * (b,(Some(2),None)) * (c,(Some(3),None)) * (d,(None,Some(4))) * (e,(None,Some(5))) */
cogroup (otherDataset, [numTasks])是将输入数据集(K, V)和另外一个数据集(K, W)进行cogroup,得到一个格式为(K, Seq[V], Seq[W])的数据集。
def cogroup(): Unit ={ val a = sc.parallelize(Seq(("aa",1),("bb",2),("cc",3))) val b = sc.parallelize(Seq(("ee",10),("dd",20),("aa",30))) val c = a.cogroup(b).collect() c.foreach(println(_)) /** * (cc,(CompactBuffer(3),CompactBuffer())) * (aa,(CompactBuffer(1),CompactBuffer(30))) * (dd,(CompactBuffer(),CompactBuffer(20))) * (ee,(CompactBuffer(),CompactBuffer(10))) * (bb,(CompactBuffer(2),CompactBuffer())) */ }
lookup 用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值
@Test def test(): Unit ={ val m = sc.parallelize(Seq(("a",1),("b",2),("b",3),("a",11))) m.lookup("a").foreach(println(_))// 1// 11 m.lookup("b").foreach(println(_))// 2// 3 }
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~