微前端架构如何改变企业的开发模式与效率提升
726
2022-10-08
PysparkNote005---批量写入Redis
Intro
批量写redis,主要考虑两个点:
数据分批-spark的foreachPartition批数据的分批插入redis,redis pipeline提交
Code
直接看代码
import redisimport jsonimport randomimport pandas as pdimport
key_list = [f"test_{str(i).zfill(3)}" for i in range(100)]value_list = [json.dumps({"name":"jack","sex":"male"})] * 100df = pd.DataFrame({"key":key_list,"value":value_list})
df.head()
key | value | |
0 | test_000 | {"name": "jack", "sex": "male"} |
1 | test_001 | {"name": "jack", "sex": "male"} |
2 | test_002 | {"name": "jack", "sex": "male"} |
3 | test_003 | {"name": "jack", "sex": "male"} |
4 | test_004 | {"name": "jack", "sex": "male"} |
from pyspark.sql import SparkSessionspark_df = spark.createDataFrame(df)spark_df.show()
+--------+--------------------+| key| value|+--------+--------------------+|test_000|{"name": "jack", ...||test_001|{"name": "jack", ...||test_002|{"name": "jack", ...||test_003|{"name": "jack", ...||test_004|{"name": "jack", ...||test_005|{"name": "jack", ...||test_006|{"name": "jack", ...||test_007|{"name": "jack", ...||test_008|{"name": "jack", ...||test_009|{"name": "jack", ...||test_010|{"name": "jack", ...||test_011|{"name": "jack", ...||test_012|{"name": "jack", ...||test_013|{"name": "jack", ...||test_014|{"name": "jack", ...||test_015|{"name": "jack", ...||test_016|{"name": "jack", ...||test_017|{"name": "jack", ...||test_018|{"name": "jack", ...||test_019|{"name": "jack", ...|+--------+--------------------+only showing top 20 rows
def insert2redis(part, batch=50, expire_time=60): """ @param part: rdd part;两列值key、value @param batch: 批量写入的数量 @param expire_time: 过期时间 @return: """ db_param = {"host": '127.0.0.1', "port": 6379, "password": '12345', "db": 0} db = redis.Redis(host=db_param["host"], port=db_param["port"], password=db_param["password"], db=db_param["db"], encoding='utf-8', decode_responses=True) pipe = db.pipeline() cnt = 0 for row in part: pipe.hset(name=row["key"], mapping=json.loads(row["value"])) # 以字典形式写入 pipe.expire(name=row["key"], time=expire_time + random.randint(0, 5)) # 过期时间随机化,防止批量过期 cnt = cnt + 1 if cnt > 0 and cnt % batch == 0: pipe.execute() print(f"第{cnt - batch}-{cnt}行数据插入redis!") # 最后一波数据如果不是batch余数,也推过去 if cnt % batch != 0: pipe.execute() print(f"第{cnt - cnt % batch}-{cnt}行数据插入redis!") pipe.close() db.close()
spark_df.repartition(3).rdd.foreachPartition( functools.partial(insert2redis, batch=100, expire_time=60))
Ref
[1] 于南京市江宁区九龙湖
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~