深入剖析 Delta Lake: MySQL CDC 实战

网友投稿 712 2022-09-02

深入剖析 Delta Lake: MySQL CDC 实战

深入剖析 Delta Lake: MySQL CDC 实战

前言

在初建大数据平台时,我想大家都遇到过这样一个需求,​​mysql 的数据如何同步到 hdfs 中供数仓分析使用​​

T+1架构

在早期,业务对实时性的要求可能还不高,大家的计算也都是 ​​T+1​​​ 的离线计算,当然也可能有 ​​H+1​​​ 的准实时计算。大家的选型可能是 ​​sqoop​​​、​​datax​​​ 等工具。此时只需要在离线调度平台上配置每天/每小时拉取 ​​mysql​​​ 数据同步到 ​​hive​​ 的任务即可。

但该方式也有一些弊端,如:

实时性较差全量抽取、尤其是每小时一次的抽取对​​mysql​​ 的压力很大单表数据量过大时(上亿数据),​​dba​​​ 不允许全量抽取,改为增量抽取后,​​hive​​ 端需要和历史数据做合并。如果增量和历史全量合并执行时长更久,如果增量和最近几个月合并可能导致脏数据涉及到分库分表抽取时难度较大,工具支持的并不完美,需要在​​hive​​ 端做合并,操作复杂

H+1架构

在 ​​18​​​ 年那会,我想以上的这种方式应该是很多公司都在使用的,即使是现在也有公司在采用。 在此架构的基础上,有些公司可能会有一些大表数据需要做 ​​​H+1​​​ 的准实时需求,面临的问题是:如果仍然每个小时抽取大表数据,而拉取数据时间又可能很长,会导致 ​​mysql​​​ 实例负载持续处于最高水位,造成 ​​mysql​​​ 服务不可用,即使是在从库拉取,也会导致从库挂掉,​​mysql​​ 同步延迟等等问题,所以此时就诞生了一种新的同步方式。

该种架构需要在 ​​mysql​​​ 端部署 ​​canal​​​ 等能够接收 ​​mysql​​​ 主库的 ​​binlog​​​ 并且能解析 ​​binlog​​​ 的工具。然后把 ​​binlog​​​ 根据库或者业务类型发送到不同的 ​​kafka​​​ 等 ​​mq​​​ 队列。而在下游,需要数据平台开发人员使用 ​​spark​​​ 或者​​flink​​​ 等实时框架开发相应的消费代码,把 ​​kafka​​​ 的 ​​binlog​​​ 变更数据写入/更新到 ​​hbase​​​ 或者 ​​kudu​​​具有更新的​​db​​​。在后面就是在离线调度平台使用 ​​hive​​​ 或者​​spark sql​​​来进行 ​​etl​​ 计算了。

当然,该种架构还是有一定的弊端

只是准实时,还是需要在离线调度平台每小时的抽取​​hbase​​ 数据把对​​mysql​​​ 的抽取压力,转移到了​​hbase​​​ 端。如果也有业务在用​​hbase​​,每小时的高峰期会导致业务读/写请求变慢如果每个小时从​​hbase​​​ 全量抽取,走的​​hbase​​​ 的​​scan​​​ 命令,数据量大的时候极慢,可能会导致后续的整个链路超出1个小时,甚至变成​​H+n​​。如果​​hbase​​​ 只存近几天的增量数据,然后使用​​row_number() over​​​ 和​​hive​​ 的全量进行合并,也可能会导致整个链路过长,如果合并最近几个月的全量数据,可能会产生脏数据。

实时方案

以前受限于 ​​parquet​​​ 文件写入 ​​HDFS​​​ 文件后,要想更新数据,就只能全量写新的数据,成本很高,并且 ​​HDFS​​​ 的设计就是一次写入多次读取。都 ​​2020​​​ 年了,到底有没有一种架构能够在分钟级别采集到 ​​HDFS​​ 呢?

很明显,是有的。据我所知,目前能够支持 ​​hdfs​​​ 更新的工具有 ​​delta lake​​​、​​hudi​​​和 ​​iceberg​​​(这三款都是数据湖框架,更新也是数据湖数据的更新,但是能够通过其它引擎读取,比如​​spark、hive、presto​​​)。但是目前 ​​iceberg​​​ 还是不支持更新的 ​​merge/upsert​​ 操作,并且 ​​iceberg​​ 和 ​​flink​​ 目前已经在合作,据说社区内部已经在把该特性的优先级放到最高了,估计会在​​21​​ 年中附近出来,而其它两款是明确支持更新操作的。我调研了一番后,发现还是 ​​databricks​​ 公司开源的 ​​delta lake​​ 上手比较简单,继承了了 ​​spark​​ 的易用性。

该种架构最简单粗暴,直接使用 ​​spark​​​ 或者 ​​flink​​​ 解析 ​​binlog​​​ 日志,然后将数据处理后写入到 ​​hdfs​​​ ,但是问题是一般 ​​mysql​​​ 都是 ​​dba​​​ 在维护,不会允许个人或者某个团队直接读取​​mysql binlog​​​,这样可能会导致重复​​binlog​​​ 消费,对 ​​mysql​​​ 压力较大。最终会由 ​​dba​​​ 或者中间件团队统一解析​​binlog​​​,然后将数据投递到 ​​kafka​​​ 等 ​​mq​​ 队列,供其它业务团队消费。所以可能的架构应该是这样

由于 ​​delta lake​​​ 和 ​​spark​​​ 都是由 ​​databricks​​​ 公司开源的,并且强绑定​​spark​​​,所以只能使用 ​​spark​​ 来操作

读取kafka解析后的消息

使用 ​​spark structured streaming​​​ 我们可以很简单的消费 ​​kafka​​ 的数据

// kafka source Dataset source = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getKafkaServer()) .option("failOnDataLoss", "true") .option("startingOffsets", "latest") .option("groupIdPrefix", "inc") .option("subscribe", config.getTopic()) .option("kafka.max.poll.interval.ms", Integer.MAX_VALUE) .load().selectExpr("cast(key as string)", "cast(value as string)");

通过这段代码,我们就获得了 ​​binlog​​​ 的数据,其中配置了​​failOnDataLoss=true​​ 来保证我们的offset如果无法读取就强制失败,提醒负责人数据丢失

设置checkpoint地址以及触发间隔

source.writeStream() .format("delta") .foreachBatch(new BatchWriteFunction(config)) .trigger(Trigger.ProcessingTime(config.getTriggerTime(), TimeUnit.MINUTES)) .option("checkpointLocation", config.getCheckPointPath() + "/stream") .queryName("all stream") .start();

由于​​spark structured streaming​​​ 没有直接写入 ​​delta table​​​ 的实现类,所以需要我们使用 ​​foreachBatch​​​ 方式来自己实现,​​foreachBatch​​​ 默认实现了​​at least once​​​ 的语义,由于 ​​delta lake​​​ 支持 ​​acid​​​ 的特性,所以我们的 ​​sink​​​ 端也保证了幂等性,最终实现了 ​​excatly once​​​ 的语义。 继续看​​​BatchWriteFunction​​

sink代码

public class BatchWriteFunction implements VoidFunction2, Long> { private final String updateSql; private final Config config; private ConcurrentHashMap cacheTable; public BatchWriteFunction(Config config) { this.config = config; if (config.isHourPt()) { this.updateSql = "sink.dt = update.dt and sink.hour = update.hour and sink.id = update.id"; } else { this.updateSql = "sink.dt = update.dt and sink.id = update.id"; } cacheTable = new ConcurrentHashMap<>(2); } @Override public void call(Dataset rowDataSet, Long v2) throws Exception { if (rowDataSet.isEmpty()) { log.warn("batch data size is empty ,ignore"); return; } //省略 } private DeltaTable getDeltaTable(Dataset rowDataSet) { DeltaTable deltaTable = cacheTable.get(config.getTableAlias()); if (deltaTable == null) { deltaTable = DeltaTable.forPath(rowDataSet.sparkSession(), config.getSinkPath()); cacheTable.put(config.getTableAlias(), deltaTable); } return deltaTable; }}

核心代码太长先省略,后续分段介绍

​​BatchWriteFunction​​有三个变量

​​updateSql​​​ :更新/插入时​​sql​​ 条件​​config​​ :相关配置信息​​cacheTable​​​:缓存的​​delta table​​

其中在构造函数中通过判断该表是小时分区还是天分区来使用不同的更新/插入条件

具体看重写的 ​​call​​方法,有两个参数

​​rowDataSet​​ :微批时间内的数据​​v2​​​:生成的唯一id,用来写入端持久化使用来保证​​excatly once​​ 语义。

在 ​​rowDataSet​​​ 为空时,我们可以直接返回,如果在这里有多个sink,或者多个action算子时,建议使用​​rowDataSet.persist​​​ 持久化一下 在 ​​​rowDataSet​​​ 不为空时,由于 ​​rowDataSet​​​ 是 ​​json​​ 格式的数据,我们需要解析一下

dataset = rowDataSet.sparkSession().read().json(rowDataSet);

这样我们就将 ​​json​​​ 数据转换为具有 ​​schema​​​ 的 ​​Row​​ 数据集

由于 ​​delta lake​​​ 不支持写入时有重复的 ​​id​​ 进行更新同一条数据,所以我们需要做一下去重操作

= dataset.schema(); StringBuilder otherCols = new StringBuilder(); String id = Constants.ID; String updateTime = Constants.UPDATE_TIME; boolean idExists = false, updateTimeExists = false; String[] fieldNames = schema.fieldNames(); int len = fieldNames.length; String fieldName; for (int i = 0; i < len; i++) { fieldName = fieldNames[i]; if (fieldName.equals(id)) { idExists = true; } else if (fieldName.equals(updateTime)) { updateTimeExists = true; } else { if (i != len - 1) { otherCols.append(fieldName).append(", "); } else { otherCols.append(fieldName); } } } if (!idExists || !updateTimeExists) { rowDataSet.show(false); throw new Exception("主键或者更新字段找不到:" + JSONObject.toJSONString(schema.fieldNames())); } otherCols.insert(0, "struct(" + updateTime + ","); otherCols.append(") as otherCols"); //4 Dataset latestChange = dataset.toDF().selectExpr("id", otherCols.toString()) .groupBy("id") .agg(max("otherCols").as("latest")) .selectExpr("id", "latest.*");

代码不复杂,主要是根据 ​​id​​​ 进行 ​​group by​​​ 然后取时间戳最新的那条数据。需要注意的是:​​otherCols​​​对象的第一个字段要是数据的更新字段,如:​​gmt_modified​​

dtList = latestChange.toJavaRDD() .groupBy((Function) v1 -> v1.getAs(Constants.DT)) .keys().collect(); StringBuilder dtStr = new StringBuilder(); int size = dtList.size(); if (size == 1) { dtStr.append("sink.dt = '").append(dtList.get(0)).append("'"); } else { dtStr.append("( sink.dt = '").append(dtList.get(0)).append("'"); for (int i = 1; i < size; i++) { dtStr.append(" or sink.dt = '").append(dtList.get(i)).append("'"); } dtStr.append(") "); }

去重之后,我们为了使用 ​​delta lake​​​ 提供的 ​​snapshot isolation​​​特性来加快​​delta lake​​​的 ​​merge​​ 操作,还对这些数据进行了分区聚合

最后进行 ​​merge​​ 操作

= getDeltaTable(rowDataSet); //持续写入避免冲突,绝大部分冲突都在delta内部解决了 while (true) { try { //7 deltaTable.as("sink") .merge(latestChange.as("update"), dtStr.toString() + " and " + this.updateSql) .whenMatched(String.format("update.%s > sink.%s", Constants.UPDATE_TIME, Constants.UPDATE_TIME)) .updateAll() .whenNotMatched() .insertAll() .execute(); break; } catch (DeltaConcurrentModificationException e) { log.error("merge data failed:{}", e.getMessage()); } }

至于为什么用 ​​while​​​ 循环写入,是为了保证写入冲突时,程序不挂掉。 将​​​deltaTable​​​表做为​​sink​​​表,​​latestChange​​​做为最终要更新的​​update​​​数据,匹配条件是过滤的分区以及更新条件,当能够匹配时判断 ​​update​​​ 的更新时间是否最新,如果最新则更新所有字段。当从sink表找不到数据,即无法匹配时,直接 ​​insertAll​​ 插入所有字段即可。

以上就是所有更新流程。当然日常我们其实也可能有其它问题,比如第一次的初始化数据怎么做,等等。

其它问题

1.如何初始化历史数据 最简单的方法当然是更新 ​​​mysql​​​ 的所有时间戳,把 ​​binlog​​​ 重发一下。使用同步工具把mysql的数据同步到 ​​kafka​​​,然后在 ​​spark​​​ 程序中把增量 ​​kafka​​​数据和历史 ​​kafka​​​ 数据进行一个 ​​union​​​ 写入到 ​​delta lake​​​.该种方式还有一些优点,可以在后续 ​​binlog​​​ 如果解析数据异常丢失,可以继续同步历史到 ​​kafka​​​,重新消费,无需更新 ​​mysql​​​ 数据 2.表的 ​​​schema​​​ 怎么定义的 由于我们 ​​​merge​​​ 操作使用了 ​​updatAll​​​ 和 ​​insertAll​​​,所以我们开启​​delta lake​​​的 ​​Automatic schema evolution​​​,该特性会自动同步我们 ​​mysql​​​ 中新加的字段到 ​​delta lake​​​ 表 3.小文件问题怎么解决 在创建 ​​​sparkSession​​​ 时设置​​spark.delta.merge.repartitionBeforeWrite=true​​​,然后配置​​spark.sql.shuffle.partitions​​​大小即可,但是​​spark.sql.shuffle.partitions​​​影响着我们shuffling 操作的分区数,如果太小可能会导致任务执行过慢,太大又会导致小文件数过多,大家要酌情配置。 另外,我们可以通过其它方式合并文件数,比如我在写入程序里新增了一个​​​kafka source​​

configStream = sparkSession.readStream() .format("kafka") .option("kafka.bootstrap.servers", config.getDataKafkaServer()) .option("failOnDataLoss", "false") .option("startingOffsets", "latest") .option("subscribe", Constants.CONFIG_TOPIC) .option("groupIdPrefix", "conf") .option("kafka.max.poll.interval.ms", Integer.MAX_VALUE) .load().selectExpr("cast(value as string)"); //配置只需要一个并行度处理即可 configStream.repartition(1) .writeStream() .foreachBatch(new CompactionFunction(config)) .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES)) .option("checkpointLocation", config.getCheckPointPath() + "/config") .queryName("config stream") .start();

然后通过在其它平台定期发送合并前一天分区的命令来合并历史分区文件数

.sparkSession().read() .format("delta") .load(config.getSinkPath()) .where(partition) .repartition(num) .write() .option("dataChange", "false") .format("delta") .mode("overwrite") .partitionBy(config.isHourPt() ? "dt,hour" : "dt") .option("replaceWhere", partition) .save(config.getSinkPath());

核心,大概就这么多。有问题大家可以留言评论

版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。

上一篇:Flink实战之StreamingFileSink如何写数据到其它HA的Hadoop集群
下一篇:学习PHP到底要学习哪些东西?(学php需要什么基础知识)
相关文章

 发表评论

暂时没有评论,来抢沙发吧~