PysparkNote004---foreachPartition的参数传递

网友投稿 1021 2022-09-02

PysparkNote004---foreachPartition的参数传递

PysparkNote004---foreachPartition的参数传递

Intro

pyspark批量写入数据库时,需要分批写入,批量写入时,只要建立一个连接,这样可以显著的提高写入速度。分批写入,容易想到foreachPartition,但是pyspark不能像scala那样

df.rdd.foreachPartition(x=>{...})

只支持

df.rdd.foreachPartition(you_function)

看下源码:

def foreachPartition(self, f): """ Applies a function to each partition of this RDD. >>> def f(iterator): ... for x in iterator: ... print(x) >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f) """ def func(it): r = f(it) try: return iter(r) except TypeError: return iter([]) self.mapPartitions(func).count() # Force evaluation

如果you_function想传入其他参数,需要通过偏函数的方式传入。其原理,简单但不一定正确的理解,就是通过偏函数,绑定参数,生产个新函数,供foreachPartition调用。直接看代码

Code

import pandas as pdimport functoolsfrom pyspark.sql import SparkSessiondf = pd.DataFrame({"x":list(range(10))})spark = SparkSession.builder.appName("pysaprk").getOrCreate()spark_df = spark.createDataFrame(df)spark_df.show()

+---+| x|+---+| 0|| 1|| 2|| 3|| 4|| 5|| 6|| 7|| 8|| 9|+---+

def test_f(part,id): for row in part: print(f"id={id},x={row['x']}")

spark_df.repartition(2).rdd.foreachPartition(functools.partial(test_f,id=0))

这样就可以把id参数传进去了

Ref

2022-04-24 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:PythonNote036---python中字典合并
下一篇:微服务架构优势在哪,与传统服务又有什么区别呢?(比微服务更好的架构)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~