PysparkNote005---批量写入Redis

网友投稿 726 2022-10-08

PysparkNote005---批量写入Redis

PysparkNote005---批量写入Redis

Intro

批量写redis,主要考虑两个点:

数据分批-spark的foreachPartition批数据的分批插入redis,redis pipeline提交

Code

直接看代码

import redisimport jsonimport randomimport pandas as pdimport

key_list = [f"test_{str(i).zfill(3)}" for i in range(100)]value_list = [json.dumps({"name":"jack","sex":"male"})] * 100df = pd.DataFrame({"key":key_list,"value":value_list})

df.head()

key

value

0

test_000

{"name": "jack", "sex": "male"}

1

test_001

{"name": "jack", "sex": "male"}

2

test_002

{"name": "jack", "sex": "male"}

3

test_003

{"name": "jack", "sex": "male"}

4

test_004

{"name": "jack", "sex": "male"}

from pyspark.sql import SparkSessionspark_df = spark.createDataFrame(df)spark_df.show()

+--------+--------------------+| key| value|+--------+--------------------+|test_000|{"name": "jack", ...||test_001|{"name": "jack", ...||test_002|{"name": "jack", ...||test_003|{"name": "jack", ...||test_004|{"name": "jack", ...||test_005|{"name": "jack", ...||test_006|{"name": "jack", ...||test_007|{"name": "jack", ...||test_008|{"name": "jack", ...||test_009|{"name": "jack", ...||test_010|{"name": "jack", ...||test_011|{"name": "jack", ...||test_012|{"name": "jack", ...||test_013|{"name": "jack", ...||test_014|{"name": "jack", ...||test_015|{"name": "jack", ...||test_016|{"name": "jack", ...||test_017|{"name": "jack", ...||test_018|{"name": "jack", ...||test_019|{"name": "jack", ...|+--------+--------------------+only showing top 20 rows

def insert2redis(part, batch=50, expire_time=60): """ @param part: rdd part;两列值key、value @param batch: 批量写入的数量 @param expire_time: 过期时间 @return: """ db_param = {"host": '127.0.0.1', "port": 6379, "password": '12345', "db": 0} db = redis.Redis(host=db_param["host"], port=db_param["port"], password=db_param["password"], db=db_param["db"], encoding='utf-8', decode_responses=True) pipe = db.pipeline() cnt = 0 for row in part: pipe.hset(name=row["key"], mapping=json.loads(row["value"])) # 以字典形式写入 pipe.expire(name=row["key"], time=expire_time + random.randint(0, 5)) # 过期时间随机化,防止批量过期 cnt = cnt + 1 if cnt > 0 and cnt % batch == 0: pipe.execute() print(f"第{cnt - batch}-{cnt}行数据插入redis!") # 最后一波数据如果不是batch余数,也推过去 if cnt % batch != 0: pipe.execute() print(f"第{cnt - cnt % batch}-{cnt}行数据插入redis!") pipe.close() db.close()

spark_df.repartition(3).rdd.foreachPartition( functools.partial(insert2redis, batch=100, expire_time=60))

Ref

​​[1] 于南京市江宁区九龙湖

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:EFX小程序框架
下一篇:Okam- 小程序开发框架(okamoto是什么牌子)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~