seata源码解析:seata AT模式是如何实现的?
seata源码解析:seata AT模式是如何实现的?
AT模式
AT模式是seata主推的一种模式,对业务无侵入,用户只需要关系自己的业务sql即可,用户的业务sql就是全局事务的一阶段,Seata会自动生成事务的二阶段提交和回滚操作。
那么它是如何做到对业务无侵入的?
seata对业务无侵入是通过数据源代理实现的。
一阶段:
在at模式中需要在客户端的数据库中建如下的表,用来保存回滚的数据
CREATE TABLE IF NOT EXISTS `undo_log`( `branch_id` BIGINT NOT NULL COMMENT 'branch transaction id', `xid` VARCHAR(128) NOT NULL COMMENT 'global transaction id', `context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization', `rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info', `log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status', `log_created` DATETIME(6) NOT NULL COMMENT 'create datetime', `log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime', UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
seata的数据源代理通过对业务sql的解析,在业务sql执行前后生成前置镜像和后置镜像保存在undo_log中,业务sql的执行和undo_log的插入在一个本地事务中,这样就可以保证任何对业务数据的更新都有相应的回滚日志存在。
在本地事务提交之前还会构建lockKey(本次事务提交影响的数据,lockKey的构建规则之前的文章已经说过了哈),向TC对涉及到的数据加锁,当加锁成功后,才会执行本地事务的commit过程,如果加锁失败,RM会根据重试策略进行重试,如果重试也失败,那么回滚本地事务
二阶段提交
当TM向TC发起全局提交请求时,此时分支事务实际上以及提交了,TC立即释放该全局事务的锁,然后异步调用RM清理回滚日志
二阶段回滚
当RM收到TC发来的回滚请求时,根据xid和branchid找到对应的回滚记录,通过回滚记录生成反向的SQL并执行,完成分支的回滚。当分支回滚结束时,通过TC回滚完成,当所有分支都回滚完成时,才会释放全局事务的锁
RM在回滚的时候会有如下的校验
beforeImmage等于afterImage则不用回滚(数据没有发生变化),否则执行下一步如果当前的记录等于afterImage,则回滚,否则执行下一步如果当前的记录等于beforeImage,则不用会滚了,否则抛出异常(当前记录和beforeImage和afterImage都不相等),说明发生了脏写
为什么会出现回滚失败呢?
有人绕过系统直接操作数据库没有正确配置RM,比如RM执行单独的本地事务,此时可以在相应的方法上加上@GlobalLock注解,让RM在本地事务提交前也需要获取全局锁
AT模式是如何保证隔离性的?
写隔离
seata at 模式的读写隔离是通过全局锁来实现的。官网的例子讲的很清楚,我们引用一下
两个全局事务 tx1 和 tx2,分别对 a 表的 m 字段进行更新操作,m 的初始值 1000。
tx1 先开始,开启本地事务,拿到本地锁,更新操作 m = 1000 - 100 = 900。本地事务提交前,先拿到该记录的全局锁,本地提交释放本地锁。tx2 后开始,开启本地事务,拿到本地锁,更新操作 m = 900 - 100 = 800。本地事务提交前,尝试拿该记录的全局锁,tx1 全局提交前,该记录的全局锁被 tx1 持有,tx2 需要重试等待全局锁。tx1 二阶段全局提交,释放全局锁。tx2 拿到全局锁提交本地事务。
如果 tx1 的二阶段全局回滚,则 tx1 需要重新获取该数据的本地锁,进行反向补偿的更新操作,实现分支的回滚。
此时,如果 tx2 仍在等待该数据的全局锁,同时持有本地锁,则 tx1 的分支回滚会失败。分支的回滚会一直重试,直到 tx2 的全局锁等锁超时,放弃全局锁并回滚本地事务释放本地锁,tx1 的分支回滚最终成功。
因为整个过程全局锁在 tx1 结束前一直是被 tx1 持有的,所以不会发生脏写的问题。
读隔离
seata at 模式默认的隔离级别为读未提交(因为已经提交的sql有可能会回滚)。如果要实现读已提交,select语句需要更改为 SELECT FOR UPDATE 语句。
SELECT FOR UPDATE 语句的执行会申请全局锁,如果全局锁被其他事务持有,则释放本地锁(回滚 SELECT FOR UPDATE 语句的本地执行)并重试。这个过程中,查询是被 block 住的,直到 全局锁 拿到,即读取的相关数据是已提交的,才返回
出于总体性能上的考虑,Seata 目前的方案并没有对所有 SELECT 语句都进行代理,仅针对 FOR UPDATE 的 SELECT 语句
源码分析
在执行sql的过程中,各个代理对象起到的作用如下
ExecuteTemplate#execute执行本地sql
分支事务的执行过程是在StatementProxy,PreparedStatementProxy的execute,executeQuery,executeUpdate等方法中的,而这些方法最终都会执行到ExecuteTemplate#execute方法
public static statementProxy, StatementCallback
根据执行的sql语句的不同选择不同的执行器,然后执行execute方法。只有2个Executor重写了execute方法,即PlainExecutor和BaseTransactionalExecutor。
没有找到对应sql分析器的语句(比如select语句),才会执行PlainExecutor#execute方法,而PlainExecutor#execute方法只是单纯调用原生的Statement来执行sql
其他的语句则会通过BaseTransactionalExecutor#execute方法来执行
// BaseTransactionalExecutorpublic T execute(Object... args) throws Throwable { String xid = RootContext.getXID(); if (xid != null) { statementProxy.getConnectionProxy().bind(xid); } // 方法被 @GlobalLock 修饰,将值设置为true statementProxy.getConnectionProxy().setGlobalLockRequire(RootContext.requireGlobalLock()); return doExecute(args);}
除了select for update语句,其他语句基本上会执行到AbstractDMLBaseExecutor#executeAutoCommitTrue方法
// AbstractDMLBaseExecutorprotected T executeAutoCommitTrue(Object[] args) throws Throwable { ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); try { connectionProxy.changeAutoCommit(); // 提交事务时会获取锁,当获锁失败时,按照锁重试策略进行重试 return new LockRetryPolicy(connectionProxy).execute(() -> { T result = executeAutoCommitFalse(args); connectionProxy.commit(); return result; }); } catch (Exception e) { // when exception occur in finally,this exception will lost, so just print it here LOGGER.error("execute executeAutoCommitTrue error:{}", e.getMessage(), e); if (!LockRetryPolicy.isLockRetryPolicyBranchRollbackOnConflict()) { connectionProxy.getTargetConnection().rollback(); } throw e; } finally { connectionProxy.getContext().reset(); connectionProxy.setAutoCommit(true); }}
protected T executeAutoCommitFalse(Object[] args) throws Exception { if (!JdbcConstants.MYSQL.equalsIgnoreCase(getDbType()) && isMultiPk()) { throw new NotSupportYetException("multi pk only support mysql!"); } // 查询前置镜像 TableRecords beforeImage = beforeImage(); // 执行sql T result = statementCallback.execute(statementProxy.getTargetStatement(), args); // 查询后置镜像 TableRecords afterImage = afterImage(beforeImage); // 构建undoLog prepareUndoLog(beforeImage, afterImage); return result;}
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException { if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) { return; } // 修改主键时抛出异常 if (SQLType.UPDATE == sqlRecognizer.getSQLType()) { if (beforeImage.getRows().size() != afterImage.getRows().size()) { throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys."); } } ConnectionProxy connectionProxy = statementProxy.getConnectionProxy(); TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage; // 构建lockKeys String lockKeys = buildLockKey(lockKeyRecords); if (null != lockKeys) { connectionProxy.appendLockKey(lockKeys); SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage); connectionProxy.appendUndoLog(sqlUndoLog); }}
主要流程就是生成前置镜像,执行业务sql,生成后置镜像,构建lockKey和undoLog,接着就是调用ConnectionProxy来提交事务
构建lockKeys的规则如下,根据执行的sql构建出对应的select语句,找到影响数据的主键值(用seata表中必须有主键)
如果主键是一列,则形式如下
// 表名:主键值1,主键值2account_info:1,2
如果主键是多列,则形式如下
// 表名:主键值1_主键值a,主键值1_主键值baccount_info:1_a,2_b
ConnectionProxy#commit提交本地事务
private void doCommit() throws SQLException { if (context.inGlobalTransaction()) { // 在全局事务中 processGlobalTransactionCommit(); } else if (context.isGlobalLockRequire()) { // 处理被 GlobalLock 修饰的方法 processLocalCommitWithGlobalLocks(); } else { targetConnection.commit(); }}
本次sql提交在一个全局事务中,则注册分支事务,插入undoLog
private void processGlobalTransactionCommit() throws SQLException { try { // 注册分支事务,向tc发一个分支事务注册请求,然后tc在branch_table表里插入一条记录 // 注册的时候tc会看事务提交所需要的资源是否能被加锁 // 如果能,则注册成功 // 如果不能,说明资源正在被别的事务使用,注册失败 register(); } catch (TransactionException e) { recognizeLockKeyConflictException(e, context.buildLockKeys()); } try { // 插入undolog UndoLogManagerFactory.getUndoLogManager(this.getDbType()).flushUndoLogs(this); // 将业务修改 和 undolog 一起插入 targetConnection.commit(); } catch (Throwable ex) { LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex); // 向tc汇报分支状态为失败 report(false); throw new SQLException(ex); } if (IS_REPORT_SUCCESS_ENABLE) { // 向tc汇报分支状态为成功 report(true); } context.reset();}
本次sql提交在一个本地事务中,只不过方法被加了@GlobalLock,需要尝试获取锁,避免脏写
private void processLocalCommitWithGlobalLocks() throws SQLException { // 查询是否能获取到锁 checkLock(context.buildLockKeys()); try { targetConnection.commit(); } catch (Throwable ex) { throw new SQLException(ex); } context.reset();}
SelectForUpdateExecutor
前面的文章我们说过用select for update语句来保证隔离级别为读已提交。SelectForUpdateExecutor就是用来执行select for update语句的。
流程比较简单就不放源码了,先构建出lockKey,然后向TC发送请求看这些记录是否已经被其他事务加锁了,如果被加锁了,则根据重试策略不断重试,如果没被加锁,则正常返回查询的结果
参考博客
[1][2]好文 [3]https://jianshu.com/p/0ed828c2019a
版权声明:本文内容由网络用户投稿,版权归原作者所有,本站不拥有其著作权,亦不承担相应法律责任。如果您发现本站中有涉嫌抄袭或描述失实的内容,请联系我们jiasou666@gmail.com 处理,核实后本网站将在24小时内删除侵权内容。
发表评论
暂时没有评论,来抢沙发吧~