微前端架构如何改变企业的开发模式与效率提升
1102
2022-10-07
spark sql 数据倾斜案例实操
项目场景:
最近数仓同学有个离线任务跑不动了,问题是总是出现MetadataFetchFailedException 的异常,最后导致任务被 kill。于是就帮忙解决一下。
原因分析:
查看了下该任务的历史执行记录(如下图),其实最近几天一直在失败,只是在重试后偶尔会成功一下
查看具体的报错日志,是shuffle 数据丢失导致的任务失败,已经询问过该同事,已经对该 spark sql 任务进行了多次调参,executor-memory 很大了任务也是同样的错误。
AY.hera.spark.conf=--master yarn --queue default --driver-memory 4g --executor-memory 20g --executor-cores 4 --num-executors 8 --conf spark.sql.broadcastTimeout=5000 --conf spark.yarn.executor.memoryOverhead=8G --conf spark.sql.shuffle.partitions=1600
大家应该知道shuffle分为 shuffle write 和 shuffle read 两个部分,其中shuffle write 会把数据 spill 到 container 的磁盘上,一般 shuffle read 的task个数都是由 spark.sql.shuffle.partitions 来控制的,该值的默认值是 200,如果该参数配置的过小,可能会导致某个task的read 的数据过大,导致 jvm crash 或者长时间处于 stw 状态,那么此时executor 就可能会导致 Failed to connect to host的异常。
这种问题的解决办法一般方式有:
增加task 并行度对于spark sql 任务我们可以通过加大spark.sql.shuffle.partitions 的值来增加task 的数量,以此来减少单个task 的shuffle read 数据量,但是这种方式的处理效果非常有限,比如某个key 有多达上亿的数据,还是会落到同一个,在上面可以看到该任务已经配置到1600 的并行度了,但是任务仍然报错。减少shuffle 数据该方案,一般是通过避免shuffle 过程来彻底解决问题,比如使用broadcast join,但是该方案一般是对于大表join 小表的场景使用。增加executor 内存通过增加executor-memory 内存,可以解决绝大部分问题,但是在数据倾斜场景下如果某个key 很大,也是属于治标不治本的操作
通过上面的 hera.spark.conf 参数我们可以看到目前该任务的 task 并行度为1600,executor-memory 为 20g。方案1 和 方案3已经尝试了,还是无法解决。 对于方案2,我们需要看下任务的 sql
WITH temp_qs_tzl_device_report_01 AS ( SELECT pid, dp_id FROM bi_ods_clear.table_log WHERE dt = '${yyyymmdd}'), temp_qs_tzl_device_report_02 AS ( SELECT id, NAME, schema_id FROM bi_ods.table_product WHERE dt = '${yyyymmdd}'), temp_qs_tzl_device_report_03 AS ( SELECT CODE, property, schema_id, dp_id FROM bi_ods.table_schema WHERE dt = '${yyyymmdd}') INSERT overwrite TABLE qs_tzl_device_report PARTITION (dt = '${yyyymmdd}') SELECT /*+ REPARTITION(400) */ a.pid AS product_id, b. NAME AS product_name, c. CODE AS property_code, c.propertyFROM temp_qs_tzl_device_report_01 aLEFT OUTER JOIN temp_qs_tzl_device_report_02 b ON a.pid = b.idLEFT OUTER JOIN temp_qs_tzl_device_report_03 c ON ( b.schema_id = c.schema_id AND a.dp_id = c.dp_id);
脚本内容很简单,主要涉及到三张表的 join,为了判断 方案2 是否可行,
查看任务的执行计划可以发现 table_log 表和 table_product 已经进行了broadcast hash join,join 后的结果与 table_schema 进行 sort merge join.
我们简单count一下三张表的数据
表名 | 数据量 |
bi_ods_clear.table_log | 843101945 |
bi_ods.table_product | 5097521 |
bi_ods.table_schema | 245870981 |
发现其实 table_schema 的数据量是很大的,已经达到了2.4亿条数据,如果强制进行进行
broadcast hash join 会直接导致container oom,查看 task 的 metircs 信息发现,某个 task 的shuffle read 处理了 14.8GB/13亿条 的数据,并且最后该task 执行失败
已经很明显的可以看出数据倾斜的问题了,剩下就是然后就是判断是那张表存在数据倾斜的现象。
select count(1) cnt, b.schema_id,a.dp_id from temp_qs_tzl_device_report_01 aleft outer join temp_qs_tzl_device_report_02 bon a.pid=b.id group by b.schema_id,a.dp_id order by cnt desc limit 100
已经测试过 table_log 和 table_log 与 table_productjoin 的结果表数据量是一致的,然后直接对这两张表的结果表进行 key 的 group by发现存在严重的数据倾斜。。排名第一的 key 多达一亿三千万数据,而第100的 count 只有 10000 多,在进行 shuffle 时会导致某个 task 的执行时间超慢,从而拖慢整个任务的进度,甚至导致 jvm crash。
解决方案:
数据倾斜的解决方案有很多种,网上一搜一大把,这里就不在讨论。我使用的是方法是:采样倾斜 key 使用随机前缀进行 join
具体步骤是:
对table_log 与table_productjoin 的join 结果表table_join_01 进行TABLESAMPLE 采样,取出倾斜key 表table_skew把table_join_01 与倾斜表table_skew 进行join,join 上的dp_id 增加0-100的随机前缀concat(cast(rand()*100 as int),'_',a.dp_id)把table_schema 与倾斜表table_skew 进行join,join 上的每条数据explode 膨胀成100 条数据,这100 条数据都按顺序附加一个0~100 的前缀此时就将原先相同的key 打散成100 份,分散到多个task 中去进行join 了。
具体代码如下:
WITH table_log AS ( SELECT pid, dp_id FROM bi_ods_clear.table_log WHERE dt = '${yyyymmdd}'), table_product AS ( SELECT id, NAME, schema_id FROM bi_ods.table_product WHERE dt = '${yyyymmdd}'), table_schema AS ( SELECT CODE, property, schema_id, dp_id FROM bi_ods.table_schema WHERE dt = '${yyyymmdd}'), table_join_01 AS ( SELECT , a.pid, a.dp_id, b. NAME, b.schema_id FROM table_log a LEFT OUTER JOIN table_product b ON a.pid = b.id), -- sample取样取出数量排名前100的倾斜key table_skew AS ( SELECT count(1) cnt, schema_id, dp_id FROM table_join_01 TABLESAMPLE (10 PERCENT) a GROUP BY schema_id, dp_id ORDER BY cnt DESC LIMIT 100), -- 驱动表倾斜key join对驱动表增加随机前缀rebuild_table_01 AS ( SELECT pid, NAME, a.schema_id, CASE WHEN b-t IS NOT NULL THEN concat( cast(rand() * 100 AS INT), '_', a.dp_id ) ELSE a.dp_id END dp_id FROM table_join_01 a LEFT JOIN table_skew b ON a.dp_id = b.dp_id AND a.schema_id = b.schema_id), -- 被驱动表倾斜key jointable_join_02 AS ( SELECT a.schema_id, a.dp_id, b-t cnt FROM table_schema a LEFT JOIN table_skew b ON a.dp_id = b.dp_id AND a.schema_id = b.schema_id), -- 被驱动表膨胀100倍 rebuild_table_02 AS ( SELECT CODE, property, schema_id, dp_id FROM table_join_02 WHERE cnt IS NULL UNION ALL SELECT CODE, property, schema_id, concat(b.prefix, '_', dp_id) prefix FROM table_join_02 lateral VIEW OUTER explode ( split ( '0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100', ',' ) ) b AS prefix WHERE cnt IS NOT NULL) INSERT overwrite TABLE qs_tzl_device_report_test PARTITION (dt = '${yyyymmdd}') SELECT /*+ REPARTITION(100) */ a.pid AS product_id, a. NAME AS product_name, c. CODE AS property_code, c.propertyFROM rebuild_table_01 aLEFT OUTER JOIN rebuild_table_02 c ON ( a.schema_id = c.schema_id AND a.dp_id = c.dp_id)
修改任务后执行发现任务的执行时间为 15 分钟附近,而在之前需要执行接近 90 分钟
并且使用的yarn资源也减少为之前的 1/2
关注我,随时获取最新文章哦
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~